From fde2161e954a83b279b11dc31a942283dff46927 Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Mon, 28 Oct 2024 12:46:53 +0100 Subject: [PATCH 1/9] Add linux network manager --- dissect/target/helpers/configutil.py | 4 +- dissect/target/helpers/record.py | 20 +- dissect/target/plugins/os/unix/linux/_os.py | 16 +- .../target/plugins/os/unix/linux/network.py | 273 ++++++++++++++++++ .../linux/NetworkManager/vlan.nmconnection | 3 + .../NetworkManager/wired-static.nmconnection | 3 + .../NetworkManager/wireless.nmconnection | 3 + .../unix/linux/systemd.network/20-vlan.netdev | 3 + .../systemd.network/20-wired-static.network | 3 + .../30-wired-static-complex.network | 3 + .../linux/systemd.network/40-wireless.network | 3 + tests/plugins/os/unix/linux/test_network.py | 167 +++++++++++ 12 files changed, 479 insertions(+), 22 deletions(-) create mode 100644 dissect/target/plugins/os/unix/linux/network.py create mode 100644 tests/_data/plugins/os/unix/linux/NetworkManager/vlan.nmconnection create mode 100644 tests/_data/plugins/os/unix/linux/NetworkManager/wired-static.nmconnection create mode 100644 tests/_data/plugins/os/unix/linux/NetworkManager/wireless.nmconnection create mode 100644 tests/_data/plugins/os/unix/linux/systemd.network/20-vlan.netdev create mode 100644 tests/_data/plugins/os/unix/linux/systemd.network/20-wired-static.network create mode 100644 tests/_data/plugins/os/unix/linux/systemd.network/30-wired-static-complex.network create mode 100644 tests/_data/plugins/os/unix/linux/systemd.network/40-wireless.network create mode 100644 tests/plugins/os/unix/linux/test_network.py diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index b794f3873..be9081aff 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -891,7 +891,7 @@ def create_parser(self, options: Optional[ParserOptions] = None) -> Configuratio } -def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, *args, **kwargs) -> ConfigParser: +def parse(path: Union[FilesystemEntry, TargetPath], hint: Optional[str] = None, *args, **kwargs) -> ConfigurationParser: """Parses the content of an ``path`` or ``entry`` to a dictionary. Args: @@ -922,7 +922,7 @@ def parse_config( entry: FilesystemEntry, hint: Optional[str] = None, options: Optional[ParserOptions] = None, -) -> ConfigParser: +) -> ConfigurationParser: parser_type = _select_parser(entry, hint) parser = parser_type.create_parser(options) diff --git a/dissect/target/helpers/record.py b/dissect/target/helpers/record.py index c680866cd..2d6d6d7b0 100644 --- a/dissect/target/helpers/record.py +++ b/dissect/target/helpers/record.py @@ -147,31 +147,39 @@ def DynamicDescriptor(types): # noqa ("string", "name"), ("string", "type"), ("boolean", "enabled"), - ("string", "mac"), ("net.ipaddress[]", "dns"), ("net.ipaddress[]", "ip"), ("net.ipaddress[]", "gateway"), + ("net.ipnetwork[]", "network"), ("string", "source"), ] UnixInterfaceRecord = TargetRecordDescriptor( "unix/network/interface", - COMMON_INTERFACE_ELEMENTS, + [ + *COMMON_INTERFACE_ELEMENTS, + ("string[]", "mac"), # We are dealing with possibilities, not reality. There are also bonded interfaces. + ("boolean", "dhcp_ipv4"), # NetworkManager allows for dual-stack configurations. + ("boolean", "dhcp_ipv6"), + ("datetime", "last_connected"), + ("varint[]", "vlan"), + ("string", "configurator"), + ], ) WindowsInterfaceRecord = TargetRecordDescriptor( "windows/network/interface", [ *COMMON_INTERFACE_ELEMENTS, - ("varint", "vlan"), - ("net.ipnetwork[]", "network"), + ("string", "mac"), ("varint", "metric"), ("stringlist", "search_domain"), ("datetime", "first_connected"), ("datetime", "last_connected"), ("net.ipaddress[]", "subnetmask"), ("boolean", "dhcp"), + ("varint", "vlan"), ], ) @@ -179,10 +187,10 @@ def DynamicDescriptor(types): # noqa "macos/network/interface", [ *COMMON_INTERFACE_ELEMENTS, - ("varint", "vlan"), - ("net.ipnetwork[]", "network"), + ("string", "mac"), ("varint", "interface_service_order"), ("boolean", "dhcp"), + ("varint", "vlan"), ], ) diff --git a/dissect/target/plugins/os/unix/linux/_os.py b/dissect/target/plugins/os/unix/linux/_os.py index fb2e6367d..da040596e 100644 --- a/dissect/target/plugins/os/unix/linux/_os.py +++ b/dissect/target/plugins/os/unix/linux/_os.py @@ -6,10 +6,7 @@ from dissect.target.plugin import OperatingSystem, export from dissect.target.plugins.os.unix._os import UnixPlugin from dissect.target.plugins.os.unix.bsd.osx._os import MacPlugin -from dissect.target.plugins.os.unix.linux.network_managers import ( - LinuxNetworkManager, - parse_unix_dhcp_log_messages, -) +from dissect.target.plugins.os.unix.linux.network_managers import LinuxNetworkManager from dissect.target.plugins.os.windows._os import WindowsPlugin from dissect.target.target import Target @@ -33,16 +30,7 @@ def detect(cls, target: Target) -> Filesystem | None: @export(property=True) def ips(self) -> list[str]: - """Returns a list of static IP addresses and DHCP lease IP addresses found on the host system.""" - ips = set() - - for ip_set in self.network_manager.get_config_value("ips"): - ips.update(ip_set) - - for ip in parse_unix_dhcp_log_messages(self.target, iter_all=False): - ips.add(ip) - - return list(ips) + return self.target.network.ips() @export(property=True) def dns(self) -> list[str]: diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py new file mode 100644 index 000000000..50bd30aee --- /dev/null +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -0,0 +1,273 @@ +from __future__ import annotations + +import re +from datetime import datetime +from ipaddress import ip_address, ip_interface +from typing import Iterator + +from dissect.target import Target +from dissect.target.helpers import configutil +from dissect.target.helpers.record import UnixInterfaceRecord +from dissect.target.plugins.general.network import NetworkPlugin +from dissect.target.target import TargetPath + + +class LinuxNetworkPlugin(NetworkPlugin): + def _interfaces(self) -> Iterator[UnixInterfaceRecord]: + """Try all available network configuration managers and aggregate the results.""" + for manager_cls in MANAGERS: + manager: LinuxConfigParser = manager_cls(self.target) + yield from manager.interfaces() + + +class LinuxConfigParser: + VlanIdByName = dict[str, int] + + def __init__(self, target: Target): + self._target = target + + def _config_files(self, config_paths: list[str], glob: str) -> Iterator[TargetPath]: + """Yield all configuration files in config_paths matching the given extension.""" + all_files = [] + for config_path in config_paths: + paths = self._target.fs.path(config_path).glob(glob) + all_files.extend(config_file for config_file in paths if config_file.is_file()) + + yield from sorted(all_files, key=lambda p: p.name) + + def interfaces(self) -> Iterator[UnixInterfaceRecord]: + """Parse network interfaces from configuration files.""" + yield from () + + +class NetworkManagerConfigParser(LinuxConfigParser): + config_paths: list[str] = [ + "/etc/NetworkManager/system-connections/", + "/usr/lib/NetworkManager/system-connections/", + "/run/NetworkManager/system-connections/", + ] + + def interfaces(self) -> Iterator[UnixInterfaceRecord]: + connections: dict[str, dict] = {} + vlan_id_by_interface: LinuxConfigParser.VlanIdByName = {} + + for connection_file_path in self._config_files(self.config_paths, "*"): + try: + connection = configutil.parse(connection_file_path, hint="ini") + + common_section: dict[str, str] = connection.get("connection", {}) + interface_type = common_section.get("type", "") + sub_type: dict[str, str] = connection.get(interface_type, {}) + + if interface_type == "vlan": + # Store vlan id by parent interface name + parent_interface = sub_type.get("parent", None) + vlan_id = sub_type.get("id", None) + if parent_interface and vlan_id: + vlan_id_by_interface[parent_interface] = int(vlan_id) + continue + + dns = set[ip_address]() + ip_interfaces: set[ip_interface] = set() + gateways: set[ip_address] = set() + dhcp_settings: dict[str, str] = {"ipv4": "", "ipv6": ""} + + for ip_version in ["ipv4", "ipv6"]: + ip_section: dict[str, str] = connection.get(ip_version, {}) + for key, value in ip_section.items(): + # nmcli inserts a trailling semicolon + if key == "dns" and (stripped := value.rstrip(";")): + dns.update({ip_address(addr) for addr in stripped.split(";")}) + elif key.startswith("address"): + # Undocumented: single gateway on address line. Observed when running: + # nmcli connection add type ethernet ... ip4 192.168.2.138/24 gw4 192.168.2.1 + ip, *gateway = value.split(",", 1) + ip_interfaces.add(ip_interface(ip)) + if gateway: + gateways.add(ip_address(gateway[0])) + elif key.startswith("gateway"): + gateways.add(ip_address(value)) + elif key == "method": + dhcp_settings[ip_version] = value + elif key.startswith("route"): + if gateway := self._parse_route(value): + gateways.add(gateway) + + name = common_section.get("interface-name", None) + mac_address = [sub_type.get("mac-address", "")] if sub_type.get("mac-address", "") else [] + connections[name] = { # Store as dict to allow for clean updating with vlan + "source": str(connection_file_path), + "enabled": None, # Stored in run-time state + "last_connected": self._parse_lastconnected(common_section.get("timestamp", "")), + "name": name, + "mac": mac_address, + "type": interface_type, + "dhcp_ipv4": dhcp_settings.get("ipv4", {}) == "auto", + "dhcp_ipv6": dhcp_settings.get("ipv6", {}) == "auto", + "dns": list(dns), + "ip": [interface.ip for interface in ip_interfaces], + "network": [interface.network for interface in ip_interfaces], + "gateway": list(gateways), + "configurator": "NetworkManager", + } + except Exception as e: + self._target.log.warning("Error parsing network config file %s: %s", connection_file_path, e) + + for parent_interface_name, vlan_id in vlan_id_by_interface.items(): + if parent_connection := connections.get(parent_interface_name): + parent_connection["vlan"] = {vlan_id} + + for connection in connections.values(): + yield UnixInterfaceRecord(**connection) + + def _parse_route(self, route: str) -> ip_address | None: + """Parse a route and return gateway IP address.""" + if (elements := route.split(",")) and len(elements) > 1: + return ip_address(elements[1]) + + return None + + def _parse_lastconnected(self, value: str) -> datetime | None: + """Parse last connected timestamp.""" + if not value: + return None + + timestamp_int = int(value) + return datetime.fromtimestamp(timestamp_int) + + +class SystemdNetworkConfigParser(LinuxConfigParser): + config_paths: list[str] = [ + "/etc/systemd/network/", + "/run/systemd/network/", + "/usr/lib/systemd/network/", + "/usr/local/lib/systemd/network/", + ] + + # Can be enclosed in brackets for IPv6. Can also have port and SNI, which we ignore. + dns_ip_patttern = re.compile(r"((?:\d{1,3}\.){3}\d{1,3})|\[(\[?[0-9a-fA-F:]+\]?)\]") + + def interfaces(self) -> Iterator: + virtual_networks = self._parse_virtual_networks() + yield from self._parse_networks(virtual_networks) + + def _parse_virtual_networks(self) -> LinuxConfigParser.VlanIdByName: + """Parse virtual network configurations from systemd network configuration files.""" + + virtual_networks: LinuxConfigParser.VlanIdByName = {} + for config_file in self._config_files(self.config_paths, "*.netdev"): + try: + virtual_network_config = configutil.parse(config_file, hint="systemd") + net_dev_section: dict[str, str] = virtual_network_config.get("NetDev", {}) + if net_dev_section.get("Kind") != "vlan": + continue + + vlan_id = virtual_network_config.get("VLAN", {}).get("Id") + if (name := net_dev_section.get("Name")) and vlan_id: + virtual_networks[name] = int(vlan_id) + except Exception as e: + self._target.log.warning("Error parsing virtual network config file %s: %s", config_file, e) + + return virtual_networks + + def _parse_networks(self, virtual_networks: LinuxConfigParser.VlanIdByName) -> Iterator[UnixInterfaceRecord]: + """Parse network configurations from systemd network configuration files.""" + for config_file in self._config_files(self.config_paths, "*.network"): + try: + config = configutil.parse(config_file, hint="systemd") + + match_section: dict[str, str] = config.get("Match", {}) + network_section: dict[str, str] = config.get("Network", {}) + link_section: dict[str, str] = config.get("Link", {}) + + ip_interfaces: set[ip_interface] = set() + gateways: set[ip_address] = set() + dns: set[ip_address] = set() + mac_addresses = set[str]() + + dhcp_ipv4, dhcp_ipv6 = self._parse_dhcp(network_section.get("DHCP")) + if link_mac := link_section.get("MACAddress"): + mac_addresses.add(link_mac) + if match_macs := match_section.get("MACAddress"): + mac_addresses.update(match_macs.split(" ")) + if permanent_macs := match_section.get("PermanentMACAddress"): + mac_addresses.update(permanent_macs.split(" ")) + + if dns_value := network_section.get("DNS"): + if isinstance(dns_value, str): + dns_value = [dns_value] + dns.update({self._parse_dns_ip(dns_ip) for dns_ip in dns_value}) + + if address_value := network_section.get("Address"): + if isinstance(address_value, str): + address_value = [address_value] + ip_interfaces.update({ip_interface(addr) for addr in address_value}) + + if gateway_value := network_section.get("Gateway"): + if isinstance(gateway_value, str): + gateway_value = [gateway_value] + gateways.update({ip_address(gateway) for gateway in gateway_value}) + + vlan_values = network_section.get("VLAN", []) + vlan_ids = { + virtual_networks[vlan_name] + for vlan_name in ([vlan_values] if isinstance(vlan_values, str) else vlan_values) + if vlan_name in virtual_networks + } + + # There are possibly multiple route sections, but they are collapsed into one by the parser. + route_section = config.get("Route", {}) + gateway_values = route_section.get("Gateway", []) + if isinstance(gateway_values, str): + gateway_values = [gateway_values] + gateways.update(filter(None, map(self._parse_gateway, gateway_values))) + + yield UnixInterfaceRecord( + source=str(config_file), + type=match_section.get("Type", None), + enabled=None, # Unknown, dependent on run-time state + dhcp_ipv4=dhcp_ipv4, + dhcp_ipv6=dhcp_ipv6, + name=match_section.get("Name", None), + dns=list(dns), + mac=list(mac_addresses), + ip=[interface.ip for interface in ip_interfaces], + network=[interface.network for interface in ip_interfaces], + gateway=list(gateways), + vlan=list(vlan_ids), + configurator="systemd-networkd", + ) + except Exception as e: + self._target.log.warning("Error parsing network config file %s: %s", config_file, e) + + def _parse_dns_ip(self, address: str) -> ip_address: + """Parse DNS address from systemd network configuration file. + + See https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html DNS for details. + """ + + match = self.dns_ip_patttern.search(address) + if match: + return ip_address(match.group(1) or match.group(2)) + else: + raise ValueError(f"Invalid DNS address format: {address}") + + def _parse_dhcp(self, value: str | None) -> tuple[bool, bool]: + """Parse DHCP value from systemd network configuration file to a boolean tuple (ipv4, ipv6).""" + + if value is None or value == "no": + return False, False + elif value == "yes": + return True, True + elif value == "ipv4": + return True, False + elif value == "ipv6": + return False, True + else: + raise ValueError(f"Invalid DHCP value: {value}") + + def _parse_gateway(self, value: str | None) -> ip_address | None: + return None if not value or value in {"_dhcp4", "_ipv6ra"} else ip_address(value) + + +MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser] diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/vlan.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan.nmconnection new file mode 100644 index 000000000..b12ae3d7b --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6ebf03683c2328a8497a38cbd038184e0760c1334dda413d7f768ab5d9136807 +size 201 diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/wired-static.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/wired-static.nmconnection new file mode 100644 index 000000000..e5cf3f233 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/wired-static.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:887db9ac918cad1d6b62eb796cf97ded4c2791610937fdaa14cf23e872dd8276 +size 409 diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/wireless.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/wireless.nmconnection new file mode 100644 index 000000000..ddf468f1a --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/wireless.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9812f728545918f4c4f1ce519145be73986fab4f9b6140eb6c6ce7a8a11c8c75 +size 130 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan.netdev b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan.netdev new file mode 100644 index 000000000..6951a194c --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan.netdev @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8ffcbd8f72fc41f21e1702332b5c6a8f2c3fb78085f21a3a3435cb3015d4dc23 +size 48 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/20-wired-static.network b/tests/_data/plugins/os/unix/linux/systemd.network/20-wired-static.network new file mode 100644 index 000000000..3914e18cf --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/20-wired-static.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2644524e0f127e1ce21751b32fa0d166405dd488fe435f4b7eb08faf7e4a8048 +size 132 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/30-wired-static-complex.network b/tests/_data/plugins/os/unix/linux/systemd.network/30-wired-static-complex.network new file mode 100644 index 000000000..e54039ba7 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/30-wired-static-complex.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ef770dec329eb6a7e09987b4c3d472a2e451019463c01e3cd7ad7f3bfb857862 +size 400 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless.network b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless.network new file mode 100644 index 000000000..c481da488 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:23a6c023a9c8422c4ca865557653446c3ba5680a6b6e73f57f5361cc4aba7bc2 +size 50 diff --git a/tests/plugins/os/unix/linux/test_network.py b/tests/plugins/os/unix/linux/test_network.py new file mode 100644 index 000000000..69ddbf1b1 --- /dev/null +++ b/tests/plugins/os/unix/linux/test_network.py @@ -0,0 +1,167 @@ +import os +import posixpath +from datetime import datetime +from ipaddress import ip_address, ip_network +from typing import Counter +from unittest.mock import MagicMock, patch + +from dissect.target import Target +from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.general.network import UnixInterfaceRecord +from dissect.target.plugins.os.unix.linux.network import ( + LinuxConfigParser, + LinuxNetworkPlugin, + NetworkManagerConfigParser, + SystemdNetworkConfigParser, +) + + +def test_networkmanager_parser(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + fixture_dir = "tests/_data/plugins/os/unix/linux/NetworkManager/" + fs_linux.makedirs("/etc/NetworkManager/system-connections") + + fs_linux.map_file( + "/etc/NetworkManager/system-connections/wired-static.nmconnection", + os.path.join(fixture_dir, "wired-static.nmconnection"), + ) + fs_linux.map_file( + "/etc/NetworkManager/system-connections/vlan.nmconnection", + os.path.join(fixture_dir, "vlan.nmconnection"), + ) + fs_linux.map_file( + "/etc/NetworkManager/system-connections/wireless.nmconnection", + os.path.join(fixture_dir, "wireless.nmconnection"), + ) + + network_manager_config_parser = NetworkManagerConfigParser(target_linux) + interfaces = list(network_manager_config_parser.interfaces()) + + assert len(interfaces) == 2 + wired, wireless = interfaces + + assert wired.name == "enp0s3" + assert wired.type == "ethernet" + assert wired.mac == ["08:00:27:5B:4A:EB"] + assert Counter(wired.ip) == Counter([ip_address("192.168.2.138"), ip_address("10.1.1.10")]) + assert wired.dns == [ip_address("88.88.88.88")] + assert Counter(wired.gateway) == Counter( + [ip_address("192.168.2.2"), ip_address("2620:52:0:2219:222:68ff:fe11:5403"), ip_address("192.168.2.3")] + ) + assert Counter(wired.network) == Counter([ip_network("192.168.2.0/24"), ip_network("10.1.0.0/16")]) + assert not wired.dhcp_ipv4 + assert not wired.dhcp_ipv6 + assert wired.enabled is None + assert wired.last_connected == datetime.fromisoformat("2024-10-29 08:59:54+00:00") + assert wired.vlan == [10] + assert wired.source == "/etc/NetworkManager/system-connections/wired-static.nmconnection" + assert wired.configurator == "NetworkManager" + + assert wireless.name == "wlp2s0" + assert wireless.type == "wifi" + assert wireless.mac == [] + assert wireless.ip == [] + assert wireless.dns == [] + assert wireless.gateway == [] + assert wireless.network == [] + assert wireless.dhcp_ipv4 + assert wireless.dhcp_ipv6 + assert wireless.enabled is None + assert wireless.last_connected is None + assert wireless.vlan == [] + assert wireless.source == "/etc/NetworkManager/system-connections/wireless.nmconnection" + assert wireless.configurator == "NetworkManager" + + +def test_systemd_network_parser(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + fixture_dir = "tests/_data/plugins/os/unix/linux/systemd.network/" + fs_linux.makedirs("/etc/systemd/network") + + fs_linux.map_file( + "/etc/systemd/network/20-wired-static.network", posixpath.join(fixture_dir, "20-wired-static.network") + ) + fs_linux.map_file( + "/etc/systemd/network/30-wired-static-complex.network", + posixpath.join(fixture_dir, "30-wired-static-complex.network"), + ) + fs_linux.map_file( + "/usr/lib/systemd/network/40-wireless.network", posixpath.join(fixture_dir, "40-wireless.network") + ) + fs_linux.map_file("/etc/systemd/network/20-vlan.netdev", posixpath.join(fixture_dir, "20-vlan.netdev")) + + systemd_network_config_parser = SystemdNetworkConfigParser(target_linux) + interfaces = list(systemd_network_config_parser.interfaces()) + + assert len(interfaces) == 3 + + wired_static, wired_static_complex, wireless = interfaces + + assert wired_static.name == "enp1s0" + assert wired_static.type is None + assert wired_static.mac == ["aa::bb::cc::dd::ee::ff"] + assert wired_static.ip == [ip_address("10.1.10.9")] + assert wired_static.dns == [ip_address("10.1.10.1")] + assert wired_static.gateway == [ip_address("10.1.10.1")] + assert wired_static.network == [ip_network("10.1.10.0/24")] + assert not wired_static.dhcp_ipv4 + assert not wired_static.dhcp_ipv6 + assert wired_static.enabled is None + assert wired_static.last_connected is None + assert wired_static.vlan == [100] + assert wired_static.source == "/etc/systemd/network/20-wired-static.network" + assert wired_static.configurator == "systemd-networkd" + + assert wired_static_complex.name == "enp1s0" + assert wired_static_complex.type == "ether" + assert Counter(wired_static_complex.mac) == Counter( + ["aa::bb::cc::dd::ee::ff", "ff::ee::dd::cc::bb::aa", "cc::ff::bb::aa::dd", "bb::aa::dd::cc::ff"] + ) + assert Counter(wired_static_complex.ip) == Counter([ip_address("10.1.10.9"), ip_address("10.1.9.10")]) + assert Counter(wired_static_complex.dns) == Counter( + [ip_address("10.1.10.1"), ip_address("10.1.10.2"), ip_address("1111:2222::3333")] + ) + assert Counter(wired_static_complex.gateway) == Counter( + [ip_address("10.1.6.3"), ip_address("10.1.10.2"), ip_address("10.1.9.3")] + ) + assert Counter(wired_static_complex.network) == Counter([ip_network("10.1.0.0/16"), ip_network("10.1.9.0/24")]) + assert not wired_static_complex.dhcp_ipv4 + assert not wired_static_complex.dhcp_ipv6 + assert wired_static_complex.enabled is None + assert wired_static_complex.last_connected is None + assert wired_static_complex.vlan == [] + assert wired_static_complex.source == "/etc/systemd/network/30-wired-static-complex.network" + assert wired_static_complex.configurator == "systemd-networkd" + + assert wireless.name == "wlp2s0" + assert wireless.type == "wifi" + assert wireless.mac == [] + assert wireless.ip == [] + assert wireless.dns == [] + assert wireless.gateway == [] + assert wireless.network == [] + assert wireless.dhcp_ipv4 + assert wireless.dhcp_ipv6 + assert wireless.enabled is None + assert wireless.last_connected is None + assert wired_static_complex.vlan == [] + assert wireless.source == "/usr/lib/systemd/network/40-wireless.network" + assert wired_static_complex.configurator == "systemd-networkd" + + +def test_linux_network_plugin_interfaces(target_linux: Target, fs_linux: VirtualFilesystem) -> None: + """Assert that the LinuxNetworkPlugin aggregates from all Config Parsers.""" + + MockLinuxConfigParser1: LinuxConfigParser = MagicMock() + MockLinuxConfigParser1.return_value.interfaces.return_value = [] + + MockLinuxConfigParser2: LinuxConfigParser = MagicMock() + MockLinuxConfigParser2.return_value.interfaces.return_value = [UnixInterfaceRecord()] + + with patch( + "dissect.target.plugins.os.unix.linux.network.MANAGERS", [MockLinuxConfigParser1, MockLinuxConfigParser2] + ): + linux_network_plugin = LinuxNetworkPlugin(target_linux) + interfaces = list(linux_network_plugin.interfaces()) + + assert len(interfaces) == 1 + MockLinuxConfigParser1.return_value.interfaces.assert_called_once() + MockLinuxConfigParser2.return_value.interfaces.assert_called_once() From ed94566dc83fe13e52b0d2b0ddfdd0f598d6a2ec Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:54:24 +0100 Subject: [PATCH 2/9] fix: make mac address a list. The base class uses this field so we cannot make it a singleton/collection depending on os. --- dissect/target/helpers/record.py | 4 +--- dissect/target/plugins/general/network.py | 2 +- .../target/plugins/os/unix/bsd/osx/network.py | 1 + dissect/target/plugins/os/unix/linux/_os.py | 16 ++++++++++++++-- dissect/target/plugins/os/unix/linux/network.py | 2 ++ dissect/target/plugins/os/windows/network.py | 3 ++- tests/plugins/general/test_network.py | 2 +- tests/plugins/os/unix/bsd/osx/test_network.py | 2 +- tests/plugins/os/windows/test_network.py | 6 +++--- 9 files changed, 26 insertions(+), 12 deletions(-) diff --git a/dissect/target/helpers/record.py b/dissect/target/helpers/record.py index 2d6d6d7b0..54611e6c8 100644 --- a/dissect/target/helpers/record.py +++ b/dissect/target/helpers/record.py @@ -145,6 +145,7 @@ def DynamicDescriptor(types): # noqa COMMON_INTERFACE_ELEMENTS = [ ("string", "name"), + ("string[]", "mac"), ("string", "type"), ("boolean", "enabled"), ("net.ipaddress[]", "dns"), @@ -159,7 +160,6 @@ def DynamicDescriptor(types): # noqa "unix/network/interface", [ *COMMON_INTERFACE_ELEMENTS, - ("string[]", "mac"), # We are dealing with possibilities, not reality. There are also bonded interfaces. ("boolean", "dhcp_ipv4"), # NetworkManager allows for dual-stack configurations. ("boolean", "dhcp_ipv6"), ("datetime", "last_connected"), @@ -172,7 +172,6 @@ def DynamicDescriptor(types): # noqa "windows/network/interface", [ *COMMON_INTERFACE_ELEMENTS, - ("string", "mac"), ("varint", "metric"), ("stringlist", "search_domain"), ("datetime", "first_connected"), @@ -187,7 +186,6 @@ def DynamicDescriptor(types): # noqa "macos/network/interface", [ *COMMON_INTERFACE_ELEMENTS, - ("string", "mac"), ("varint", "interface_service_order"), ("boolean", "dhcp"), ("varint", "vlan"), diff --git a/dissect/target/plugins/general/network.py b/dissect/target/plugins/general/network.py index 236c10c71..8ddf5e4b5 100644 --- a/dissect/target/plugins/general/network.py +++ b/dissect/target/plugins/general/network.py @@ -79,7 +79,7 @@ def with_ip(self, ip_addr: str) -> Iterator[InterfaceRecord]: @internal def with_mac(self, mac: str) -> Iterator[InterfaceRecord]: for interface in self.interfaces(): - if interface.mac == mac: + if mac in interface.mac: yield interface @internal diff --git a/dissect/target/plugins/os/unix/bsd/osx/network.py b/dissect/target/plugins/os/unix/bsd/osx/network.py index 23a9318b6..fc36cc3f9 100644 --- a/dissect/target/plugins/os/unix/bsd/osx/network.py +++ b/dissect/target/plugins/os/unix/bsd/osx/network.py @@ -84,6 +84,7 @@ def _interfaces(self) -> Iterator[MacInterfaceRecord]: network=network, interface_service_order=interface_service_order, dhcp=dhcp, + mac=[], _target=self.target, ) diff --git a/dissect/target/plugins/os/unix/linux/_os.py b/dissect/target/plugins/os/unix/linux/_os.py index da040596e..fb2e6367d 100644 --- a/dissect/target/plugins/os/unix/linux/_os.py +++ b/dissect/target/plugins/os/unix/linux/_os.py @@ -6,7 +6,10 @@ from dissect.target.plugin import OperatingSystem, export from dissect.target.plugins.os.unix._os import UnixPlugin from dissect.target.plugins.os.unix.bsd.osx._os import MacPlugin -from dissect.target.plugins.os.unix.linux.network_managers import LinuxNetworkManager +from dissect.target.plugins.os.unix.linux.network_managers import ( + LinuxNetworkManager, + parse_unix_dhcp_log_messages, +) from dissect.target.plugins.os.windows._os import WindowsPlugin from dissect.target.target import Target @@ -30,7 +33,16 @@ def detect(cls, target: Target) -> Filesystem | None: @export(property=True) def ips(self) -> list[str]: - return self.target.network.ips() + """Returns a list of static IP addresses and DHCP lease IP addresses found on the host system.""" + ips = set() + + for ip_set in self.network_manager.get_config_value("ips"): + ips.update(ip_set) + + for ip in parse_unix_dhcp_log_messages(self.target, iter_all=False): + ips.add(ip) + + return list(ips) @export(property=True) def dns(self) -> list[str]: diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index 50bd30aee..a813e1006 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -13,6 +13,8 @@ class LinuxNetworkPlugin(NetworkPlugin): + """Linux network interface plugin.""" + def _interfaces(self) -> Iterator[UnixInterfaceRecord]: """Try all available network configuration managers and aggregate the results.""" for manager_cls in MANAGERS: diff --git a/dissect/target/plugins/os/windows/network.py b/dissect/target/plugins/os/windows/network.py index d29d00a39..d7c39ede8 100644 --- a/dissect/target/plugins/os/windows/network.py +++ b/dissect/target/plugins/os/windows/network.py @@ -281,7 +281,8 @@ def _interfaces(self) -> Iterator[WindowsInterfaceRecord]: pass # Extract the rest of the device information - device_info["mac"] = _try_value(subkey, "NetworkAddress") + if mac_address := _try_value(subkey, "NetworkAddress"): + device_info["mac"] = [mac_address] device_info["vlan"] = _try_value(subkey, "VlanID") if timestamp := _try_value(subkey, "NetworkInterfaceInstallTimestamp"): diff --git a/tests/plugins/general/test_network.py b/tests/plugins/general/test_network.py index a8131235f..3819a3003 100644 --- a/tests/plugins/general/test_network.py +++ b/tests/plugins/general/test_network.py @@ -17,7 +17,7 @@ def network_record(request: pytest.FixtureRequest) -> InterfaceRecord: name="interface_name", type="physical", enabled=True, - mac="DE:AD:BE:EF:00:00", + mac=["DE:AD:BE:EF:00:00"], ip=["10.42.42.10"], gateway=["10.42.42.1"], dns=["8.8.8.8", "1.1.1.1"], diff --git a/tests/plugins/os/unix/bsd/osx/test_network.py b/tests/plugins/os/unix/bsd/osx/test_network.py index d21ea6ec5..b804e12ea 100644 --- a/tests/plugins/os/unix/bsd/osx/test_network.py +++ b/tests/plugins/os/unix/bsd/osx/test_network.py @@ -95,7 +95,7 @@ def dhcp(fake_plist: dict) -> Iterator[dict]: (0, "vlan", ["None"]), (0, "enabled", ["True"]), (0, "interface_service_order", ["0"]), - (0, "mac", ["None"]), + (0, "mac", []), (0, "vlan", ["None"]), ], 1, diff --git a/tests/plugins/os/windows/test_network.py b/tests/plugins/os/windows/test_network.py index 2398e66d7..a9237fcdf 100644 --- a/tests/plugins/os/windows/test_network.py +++ b/tests/plugins/os/windows/test_network.py @@ -271,7 +271,7 @@ def test_windows_network_none( "ip": ["192.168.0.10"], "dns": ["192.168.0.2"], "gateway": ["192.168.0.1"], - "mac": "FE:EE:EE:EE:EE:ED", + "mac": ["FE:EE:EE:EE:EE:ED"], "subnetmask": ["255.255.255.0"], "network": ["192.168.0.0/24"], "first_connected": datetime.fromisoformat("2012-12-21 00:00:00+00:00"), @@ -287,7 +287,7 @@ def test_windows_network_none( "ip": ["10.0.0.10"], "dns": ["10.0.0.2"], "gateway": ["10.0.0.1"], - "mac": "FE:EE:EE:EE:EE:ED", + "mac": ["FE:EE:EE:EE:EE:ED"], "subnetmask": ["255.255.255.0"], "network": ["10.0.0.0/24"], "first_connected": datetime.fromisoformat("2012-12-21 00:00:00+00:00"), @@ -344,7 +344,7 @@ def test_network_dhcp_and_static( ips.update(interface.ip) dns.update(interface.dns) gateways.update(interface.gateway) - macs.add(interface.mac) + macs.update(interface.mac) assert interface.ip == expected["ip"] assert interface.dns == expected["dns"] From 84fb78cbe5483015a67b4acbc3c4eb1bbbec9d99 Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Tue, 5 Nov 2024 10:44:47 +0100 Subject: [PATCH 3/9] fix: convert last connected timestamp to utc --- dissect/target/plugins/os/unix/linux/network.py | 4 ++-- tests/plugins/os/unix/linux/test_network.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index a813e1006..c13095e73 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -1,7 +1,7 @@ from __future__ import annotations import re -from datetime import datetime +from datetime import datetime, timezone from ipaddress import ip_address, ip_interface from typing import Iterator @@ -135,7 +135,7 @@ def _parse_lastconnected(self, value: str) -> datetime | None: return None timestamp_int = int(value) - return datetime.fromtimestamp(timestamp_int) + return datetime.fromtimestamp(timestamp_int, timezone.utc) class SystemdNetworkConfigParser(LinuxConfigParser): diff --git a/tests/plugins/os/unix/linux/test_network.py b/tests/plugins/os/unix/linux/test_network.py index 69ddbf1b1..f40aba5ed 100644 --- a/tests/plugins/os/unix/linux/test_network.py +++ b/tests/plugins/os/unix/linux/test_network.py @@ -51,7 +51,7 @@ def test_networkmanager_parser(target_linux: Target, fs_linux: VirtualFilesystem assert not wired.dhcp_ipv4 assert not wired.dhcp_ipv6 assert wired.enabled is None - assert wired.last_connected == datetime.fromisoformat("2024-10-29 08:59:54+00:00") + assert wired.last_connected == datetime.fromisoformat("2024-10-29 07:59:54+00:00") assert wired.vlan == [10] assert wired.source == "/etc/NetworkManager/system-connections/wired-static.nmconnection" assert wired.configurator == "NetworkManager" From 9b5cd6ad6aba812582e4aa5e9dc2f1b7eda52da5 Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Tue, 5 Nov 2024 15:57:37 +0100 Subject: [PATCH 4/9] fix: silly use of Iterator --- dissect/target/plugins/os/unix/linux/network.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index c13095e73..9a5956c4f 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -28,14 +28,14 @@ class LinuxConfigParser: def __init__(self, target: Target): self._target = target - def _config_files(self, config_paths: list[str], glob: str) -> Iterator[TargetPath]: + def _config_files(self, config_paths: list[str], glob: str) -> list[TargetPath]: """Yield all configuration files in config_paths matching the given extension.""" all_files = [] for config_path in config_paths: paths = self._target.fs.path(config_path).glob(glob) all_files.extend(config_file for config_file in paths if config_file.is_file()) - yield from sorted(all_files, key=lambda p: p.name) + return sorted(all_files, key=lambda p: p.name) def interfaces(self) -> Iterator[UnixInterfaceRecord]: """Parse network interfaces from configuration files.""" @@ -146,7 +146,8 @@ class SystemdNetworkConfigParser(LinuxConfigParser): "/usr/local/lib/systemd/network/", ] - # Can be enclosed in brackets for IPv6. Can also have port and SNI, which we ignore. + # Can be enclosed in brackets for IPv6. Can also have port, iface name, and SNI, which we ignore. + # Example: [1111:2222::3333]:9953%ifname#example.com dns_ip_patttern = re.compile(r"((?:\d{1,3}\.){3}\d{1,3})|\[(\[?[0-9a-fA-F:]+\]?)\]") def interfaces(self) -> Iterator: From ca48ca3cc1f26b95bd4adbf6f52e7712760407c9 Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Tue, 5 Nov 2024 17:28:24 +0100 Subject: [PATCH 5/9] fix: return all NetworkManager connections. - connections with same iface name caused conflicts. - support vlan matching on uuid. --- .../target/plugins/os/unix/linux/network.py | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index 9a5956c4f..8c42acb7c 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -50,16 +50,16 @@ class NetworkManagerConfigParser(LinuxConfigParser): ] def interfaces(self) -> Iterator[UnixInterfaceRecord]: - connections: dict[str, dict] = {} + connections: list[dict] = [] vlan_id_by_interface: LinuxConfigParser.VlanIdByName = {} for connection_file_path in self._config_files(self.config_paths, "*"): try: - connection = configutil.parse(connection_file_path, hint="ini") + config = configutil.parse(connection_file_path, hint="ini") - common_section: dict[str, str] = connection.get("connection", {}) + common_section: dict[str, str] = config.get("connection", {}) interface_type = common_section.get("type", "") - sub_type: dict[str, str] = connection.get(interface_type, {}) + sub_type: dict[str, str] = config.get(interface_type, {}) if interface_type == "vlan": # Store vlan id by parent interface name @@ -75,7 +75,7 @@ def interfaces(self) -> Iterator[UnixInterfaceRecord]: dhcp_settings: dict[str, str] = {"ipv4": "", "ipv6": ""} for ip_version in ["ipv4", "ipv6"]: - ip_section: dict[str, str] = connection.get(ip_version, {}) + ip_section: dict[str, str] = config.get(ip_version, {}) for key, value in ip_section.items(): # nmcli inserts a trailling semicolon if key == "dns" and (stripped := value.rstrip(";")): @@ -97,7 +97,9 @@ def interfaces(self) -> Iterator[UnixInterfaceRecord]: name = common_section.get("interface-name", None) mac_address = [sub_type.get("mac-address", "")] if sub_type.get("mac-address", "") else [] - connections[name] = { # Store as dict to allow for clean updating with vlan + + connection = { # Store as dict to allow for clean updating with vlan + "uuid": common_section.get("uuid"), # Used for looking up parent of vlan "source": str(connection_file_path), "enabled": None, # Stored in run-time state "last_connected": self._parse_lastconnected(common_section.get("timestamp", "")), @@ -112,15 +114,19 @@ def interfaces(self) -> Iterator[UnixInterfaceRecord]: "gateway": list(gateways), "configurator": "NetworkManager", } + connections.append(connection) + except Exception as e: self._target.log.warning("Error parsing network config file %s: %s", connection_file_path, e) - for parent_interface_name, vlan_id in vlan_id_by_interface.items(): - if parent_connection := connections.get(parent_interface_name): - parent_connection["vlan"] = {vlan_id} + # Unfavorable O(n^2) complexity, but the number of vlans is expected to be small + for connection in connections: + if vlan_id := vlan_id_by_interface.get(connection["name"]) or vlan_id_by_interface.get(connection["uuid"]): + connection["vlan"] = {vlan_id} - for connection in connections.values(): - yield UnixInterfaceRecord(**connection) + for config in connections: + config.pop("uuid", None) + yield UnixInterfaceRecord(**config) def _parse_route(self, route: str) -> ip_address | None: """Parse a route and return gateway IP address.""" From 9e20c3dbad591f1168e4431c91c70bb44d2511e9 Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:42:14 +0100 Subject: [PATCH 6/9] Add docs --- .../target/plugins/os/unix/linux/network.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index 8c42acb7c..785d6530f 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -43,6 +43,13 @@ def interfaces(self) -> Iterator[UnixInterfaceRecord]: class NetworkManagerConfigParser(LinuxConfigParser): + """NetworkManager configuration parser. + + NetworkManager configuration files are generally in an INI-like format. + Note that Red Hat and Fedora deprecated ifcfg files. + Documentation: https://networkmanager.dev/docs/api/latest/nm-settings-keyfile.html + """ + config_paths: list[str] = [ "/etc/NetworkManager/system-connections/", "/usr/lib/NetworkManager/system-connections/", @@ -145,6 +152,14 @@ def _parse_lastconnected(self, value: str) -> datetime | None: class SystemdNetworkConfigParser(LinuxConfigParser): + """Systemd network configuration parser. + + Systemd network configuration files are generally in an INI-like format with some quirks. + Note that drop-in directories are not yet supported. + + Documentation: https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html + """ + config_paths: list[str] = [ "/etc/systemd/network/", "/run/systemd/network/", @@ -252,7 +267,8 @@ def _parse_networks(self, virtual_networks: LinuxConfigParser.VlanIdByName) -> I def _parse_dns_ip(self, address: str) -> ip_address: """Parse DNS address from systemd network configuration file. - See https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html DNS for details. + The optional brackets and port number make this hard to parse. + See https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html and search for DNS. """ match = self.dns_ip_patttern.search(address) From d3175f81e507cd96cd9e6302b8072042d4fc6ad0 Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:27:23 +0100 Subject: [PATCH 7/9] Process review comments --- dissect/target/helpers/utils.py | 21 +- .../target/plugins/os/unix/bsd/osx/network.py | 2 +- .../target/plugins/os/unix/linux/network.py | 270 ++++++++++-------- .../systemd.network/40-wireless-ipv4.network | 3 + .../systemd.network/40-wireless-ipv6.network | 3 + tests/helpers/test_utils.py | 18 +- tests/plugins/os/unix/linux/test_network.py | 42 +-- 7 files changed, 211 insertions(+), 148 deletions(-) create mode 100644 tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv4.network create mode 100644 tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv6.network diff --git a/dissect/target/helpers/utils.py b/dissect/target/helpers/utils.py index 0f3578dc9..7648e94e2 100644 --- a/dissect/target/helpers/utils.py +++ b/dissect/target/helpers/utils.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import logging import re import urllib.parse from datetime import datetime, timezone, tzinfo from enum import Enum from pathlib import Path -from typing import BinaryIO, Callable, Iterator, Optional, Union +from typing import BinaryIO, Callable, Iterator, Optional, TypeVar, Union from dissect.util.ts import from_unix @@ -24,6 +26,23 @@ def findall(buf: bytes, needle: bytes) -> Iterator[int]: offset += 1 +T = TypeVar("T") + + +def to_list(value: T | list[T]) -> list[T]: + """Convert a single value or a list of values to a list. + + Args: + value: The value to convert. + + Returns: + A list of values. + """ + if not isinstance(value, list): + return [value] + return value + + class StrEnum(str, Enum): """Sortable and serializible string-based enum""" diff --git a/dissect/target/plugins/os/unix/bsd/osx/network.py b/dissect/target/plugins/os/unix/bsd/osx/network.py index fc36cc3f9..5017db750 100644 --- a/dissect/target/plugins/os/unix/bsd/osx/network.py +++ b/dissect/target/plugins/os/unix/bsd/osx/network.py @@ -89,5 +89,5 @@ def _interfaces(self) -> Iterator[MacInterfaceRecord]: ) except Exception as e: - self.target.log.warning("Error reading configuration for network device %s: %s", name, e) + self.target.log.warning("Error reading configuration for network device %s", name, exc_info=e) continue diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index 785d6530f..dd4d7007f 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -1,13 +1,15 @@ from __future__ import annotations import re +from dataclasses import dataclass, field from datetime import datetime, timezone from ipaddress import ip_address, ip_interface -from typing import Iterator +from typing import Iterator, Literal, NamedTuple from dissect.target import Target from dissect.target.helpers import configutil from dissect.target.helpers.record import UnixInterfaceRecord +from dissect.target.helpers.utils import to_list from dissect.target.plugins.general.network import NetworkPlugin from dissect.target.target import TargetPath @@ -18,13 +20,14 @@ class LinuxNetworkPlugin(NetworkPlugin): def _interfaces(self) -> Iterator[UnixInterfaceRecord]: """Try all available network configuration managers and aggregate the results.""" for manager_cls in MANAGERS: - manager: LinuxConfigParser = manager_cls(self.target) + manager: LinuxNetworkConfigParser = manager_cls(self.target) yield from manager.interfaces() -class LinuxConfigParser: - VlanIdByName = dict[str, int] +VlanIdByInterface = dict[str, set[int]] + +class LinuxNetworkConfigParser: def __init__(self, target: Target): self._target = target @@ -35,14 +38,14 @@ def _config_files(self, config_paths: list[str], glob: str) -> list[TargetPath]: paths = self._target.fs.path(config_path).glob(glob) all_files.extend(config_file for config_file in paths if config_file.is_file()) - return sorted(all_files, key=lambda p: p.name) + return sorted(all_files, key=lambda p: p.stem) def interfaces(self) -> Iterator[UnixInterfaceRecord]: """Parse network interfaces from configuration files.""" yield from () -class NetworkManagerConfigParser(LinuxConfigParser): +class NetworkManagerConfigParser(LinuxNetworkConfigParser): """NetworkManager configuration parser. NetworkManager configuration files are generally in an INI-like format. @@ -56,84 +59,76 @@ class NetworkManagerConfigParser(LinuxConfigParser): "/run/NetworkManager/system-connections/", ] + @dataclass + class ParserContext: + source: str + uuid: str | None = None + last_connected: datetime | None = None + name: str | None = None + mac_address: str | None = None + type: str = "" + dns: set[ip_address] = field(default_factory=set) + ip_interfaces: set[ip_interface] = field(default_factory=set) + gateways: set[ip_address] = field(default_factory=set) + dhcp_ipv4: bool = False + dhcp_ipv6: bool = False + vlan: set[int] = field(default_factory=set) + + def to_record(self) -> UnixInterfaceRecord: + return UnixInterfaceRecord( + source=self.source, + last_connected=self.last_connected, + name=self.name, + mac=[self.mac_address] if self.mac_address else [], + type=self.type, + dhcp_ipv4=self.dhcp_ipv4, + dhcp_ipv6=self.dhcp_ipv6, + dns=list(self.dns), + ip=[interface.ip for interface in self.ip_interfaces], + network=[interface.network for interface in self.ip_interfaces], + gateway=list(self.gateways), + vlan=list(self.vlan), + configurator="NetworkManager", + ) + def interfaces(self) -> Iterator[UnixInterfaceRecord]: - connections: list[dict] = [] - vlan_id_by_interface: LinuxConfigParser.VlanIdByName = {} + connections: list[NetworkManagerConfigParser.ParserContext] = [] + vlan_id_by_interface: VlanIdByInterface = {} for connection_file_path in self._config_files(self.config_paths, "*"): try: config = configutil.parse(connection_file_path, hint="ini") - + context = self.ParserContext(source=connection_file_path) common_section: dict[str, str] = config.get("connection", {}) - interface_type = common_section.get("type", "") - sub_type: dict[str, str] = config.get(interface_type, {}) - - if interface_type == "vlan": - # Store vlan id by parent interface name - parent_interface = sub_type.get("parent", None) - vlan_id = sub_type.get("id", None) - if parent_interface and vlan_id: - vlan_id_by_interface[parent_interface] = int(vlan_id) - continue + context.type = common_section.get("type", "") + sub_type: dict[str, str] = config.get(context.type, {}) - dns = set[ip_address]() - ip_interfaces: set[ip_interface] = set() - gateways: set[ip_address] = set() - dhcp_settings: dict[str, str] = {"ipv4": "", "ipv6": ""} + if context.type == "vlan": + self._parse_vlan(sub_type, vlan_id_by_interface) + continue for ip_version in ["ipv4", "ipv6"]: ip_section: dict[str, str] = config.get(ip_version, {}) for key, value in ip_section.items(): - # nmcli inserts a trailling semicolon - if key == "dns" and (stripped := value.rstrip(";")): - dns.update({ip_address(addr) for addr in stripped.split(";")}) - elif key.startswith("address"): - # Undocumented: single gateway on address line. Observed when running: - # nmcli connection add type ethernet ... ip4 192.168.2.138/24 gw4 192.168.2.1 - ip, *gateway = value.split(",", 1) - ip_interfaces.add(ip_interface(ip)) - if gateway: - gateways.add(ip_address(gateway[0])) - elif key.startswith("gateway"): - gateways.add(ip_address(value)) - elif key == "method": - dhcp_settings[ip_version] = value - elif key.startswith("route"): - if gateway := self._parse_route(value): - gateways.add(gateway) - - name = common_section.get("interface-name", None) - mac_address = [sub_type.get("mac-address", "")] if sub_type.get("mac-address", "") else [] - - connection = { # Store as dict to allow for clean updating with vlan - "uuid": common_section.get("uuid"), # Used for looking up parent of vlan - "source": str(connection_file_path), - "enabled": None, # Stored in run-time state - "last_connected": self._parse_lastconnected(common_section.get("timestamp", "")), - "name": name, - "mac": mac_address, - "type": interface_type, - "dhcp_ipv4": dhcp_settings.get("ipv4", {}) == "auto", - "dhcp_ipv6": dhcp_settings.get("ipv6", {}) == "auto", - "dns": list(dns), - "ip": [interface.ip for interface in ip_interfaces], - "network": [interface.network for interface in ip_interfaces], - "gateway": list(gateways), - "configurator": "NetworkManager", - } - connections.append(connection) + self._parse_ip_section_key(key, value, context, ip_version) + + context.name = common_section.get("interface-name") + context.mac_address = sub_type.get("mac-address") + context.uuid = common_section.get("uuid") + context.source = str(connection_file_path) + context.last_connected = self._parse_lastconnected(common_section.get("timestamp", "")) + + connections.append(context) except Exception as e: self._target.log.warning("Error parsing network config file %s: %s", connection_file_path, e) - # Unfavorable O(n^2) complexity, but the number of vlans is expected to be small for connection in connections: - if vlan_id := vlan_id_by_interface.get(connection["name"]) or vlan_id_by_interface.get(connection["uuid"]): - connection["vlan"] = {vlan_id} - - for config in connections: - config.pop("uuid", None) - yield UnixInterfaceRecord(**config) + uuid = vlan_id_by_interface.get(context.uuid) if context.uuid else None + name = vlan_id_by_interface.get(connection.name) if connection.name else None + if vlan_ids := (name or uuid): + connection.vlan.update(vlan_ids) + yield connection.to_record() def _parse_route(self, route: str) -> ip_address | None: """Parse a route and return gateway IP address.""" @@ -142,16 +137,47 @@ def _parse_route(self, route: str) -> ip_address | None: return None - def _parse_lastconnected(self, value: str) -> datetime | None: + def _parse_lastconnected(self, last_connected: str) -> datetime | None: """Parse last connected timestamp.""" - if not value: + if not last_connected: return None - timestamp_int = int(value) - return datetime.fromtimestamp(timestamp_int, timezone.utc) - - -class SystemdNetworkConfigParser(LinuxConfigParser): + return datetime.fromtimestamp(int(last_connected), timezone.utc) + + def _parse_ip_section_key( + self, key: str, value: str, context: ParserContext, ip_version: Literal["ipv4", "ipv6"] + ) -> None: + # nmcli inserts a trailling semicolon + if key == "dns" and (stripped := value.rstrip(";")): + context.dns.update({ip_address(addr) for addr in stripped.split(";")}) + elif key.startswith("address") and (trimmed := value.strip()): + # Undocumented: single gateway on address line. Observed when running: + # nmcli connection add type ethernet ... ip4 192.168.2.138/24 gw4 192.168.2.1 + ip, *gateway = trimmed.split(",", 1) + context.ip_interfaces.add(ip_interface(ip)) + if gateway: + context.gateways.add(ip_address(gateway[0])) + elif key.startswith("gateway") and value: + context.gateways.add(ip_address(value)) + elif key == "method" and ip_version == "ipv4": + context.dhcp_ipv4 = value == "auto" + elif key == "method" and ip_version == "ipv6": + context.dhcp_ipv6 = value == "auto" + elif key.startswith("route"): + if gateway := self._parse_route(value): + context.gateways.add(gateway) + + def _parse_vlan(self, sub_type: dict["str", any], vlan_id_by_interface: VlanIdByInterface) -> None: + parent_interface = sub_type.get("parent", None) + vlan_id = sub_type.get("id", None) + if not parent_interface or not vlan_id: + return + + ids = vlan_id_by_interface.setdefault(parent_interface, set()) + ids.add(int(vlan_id)) + + +class SystemdNetworkConfigParser(LinuxNetworkConfigParser): """Systemd network configuration parser. Systemd network configuration files are generally in an INI-like format with some quirks. @@ -167,18 +193,24 @@ class SystemdNetworkConfigParser(LinuxConfigParser): "/usr/local/lib/systemd/network/", ] + class DhcpConfig(NamedTuple): + ipv4: bool + ipv6: bool + # Can be enclosed in brackets for IPv6. Can also have port, iface name, and SNI, which we ignore. # Example: [1111:2222::3333]:9953%ifname#example.com - dns_ip_patttern = re.compile(r"((?:\d{1,3}\.){3}\d{1,3})|\[(\[?[0-9a-fA-F:]+\]?)\]") + dns_ip_patttern = re.compile( + r"(?P(?:\d{1,3}\.){3}\d{1,3})|\[(?P\[?[0-9a-fA-F:]+\]?)\]" + ) def interfaces(self) -> Iterator: virtual_networks = self._parse_virtual_networks() yield from self._parse_networks(virtual_networks) - def _parse_virtual_networks(self) -> LinuxConfigParser.VlanIdByName: + def _parse_virtual_networks(self) -> VlanIdByInterface: """Parse virtual network configurations from systemd network configuration files.""" - virtual_networks: LinuxConfigParser.VlanIdByName = {} + virtual_networks: VlanIdByInterface = {} for config_file in self._config_files(self.config_paths, "*.netdev"): try: virtual_network_config = configutil.parse(config_file, hint="systemd") @@ -190,11 +222,11 @@ def _parse_virtual_networks(self) -> LinuxConfigParser.VlanIdByName: if (name := net_dev_section.get("Name")) and vlan_id: virtual_networks[name] = int(vlan_id) except Exception as e: - self._target.log.warning("Error parsing virtual network config file %s: %s", config_file, e) + self._target.log.warning("Error parsing virtual network config file %s", config_file, exc_info=e) return virtual_networks - def _parse_networks(self, virtual_networks: LinuxConfigParser.VlanIdByName) -> Iterator[UnixInterfaceRecord]: + def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixInterfaceRecord]: """Parse network configurations from systemd network configuration files.""" for config_file in self._config_files(self.config_paths, "*.network"): try: @@ -207,45 +239,36 @@ def _parse_networks(self, virtual_networks: LinuxConfigParser.VlanIdByName) -> I ip_interfaces: set[ip_interface] = set() gateways: set[ip_address] = set() dns: set[ip_address] = set() - mac_addresses = set[str]() + mac_addresses: set[str] = set() - dhcp_ipv4, dhcp_ipv6 = self._parse_dhcp(network_section.get("DHCP")) if link_mac := link_section.get("MACAddress"): mac_addresses.add(link_mac) - if match_macs := match_section.get("MACAddress"): - mac_addresses.update(match_macs.split(" ")) - if permanent_macs := match_section.get("PermanentMACAddress"): - mac_addresses.update(permanent_macs.split(" ")) - - if dns_value := network_section.get("DNS"): - if isinstance(dns_value, str): - dns_value = [dns_value] - dns.update({self._parse_dns_ip(dns_ip) for dns_ip in dns_value}) - - if address_value := network_section.get("Address"): - if isinstance(address_value, str): - address_value = [address_value] - ip_interfaces.update({ip_interface(addr) for addr in address_value}) - - if gateway_value := network_section.get("Gateway"): - if isinstance(gateway_value, str): - gateway_value = [gateway_value] - gateways.update({ip_address(gateway) for gateway in gateway_value}) - - vlan_values = network_section.get("VLAN", []) + mac_addresses.update(match_section.get("MACAddress", "").split()) + mac_addresses.update(match_section.get("PermanentMACAddress", "").split()) + + dns_value = to_list(network_section.get("DNS", [])) + dns.update({self._parse_dns_ip(dns_ip) for dns_ip in dns_value}) + + address_value = to_list(network_section.get("Address", [])) + ip_interfaces.update({ip_interface(addr) for addr in address_value}) + + gateway_value = to_list(network_section.get("Gateway", [])) + gateways.update({ip_address(gateway) for gateway in gateway_value}) + + vlan_names = network_section.get("VLAN", []) vlan_ids = { - virtual_networks[vlan_name] - for vlan_name in ([vlan_values] if isinstance(vlan_values, str) else vlan_values) - if vlan_name in virtual_networks + vlan_id + for vlan_name in to_list(vlan_names) + if (vlan_id := virtual_networks.get(vlan_name)) is not None } # There are possibly multiple route sections, but they are collapsed into one by the parser. route_section = config.get("Route", {}) - gateway_values = route_section.get("Gateway", []) - if isinstance(gateway_values, str): - gateway_values = [gateway_values] + gateway_values = to_list(route_section.get("Gateway", [])) gateways.update(filter(None, map(self._parse_gateway, gateway_values))) + dhcp_ipv4, dhcp_ipv6 = self._parse_dhcp(network_section.get("DHCP")) + yield UnixInterfaceRecord( source=str(config_file), type=match_section.get("Type", None), @@ -262,7 +285,7 @@ def _parse_networks(self, virtual_networks: LinuxConfigParser.VlanIdByName) -> I configurator="systemd-networkd", ) except Exception as e: - self._target.log.warning("Error parsing network config file %s: %s", config_file, e) + self._target.log.warning("Error parsing network config file %s", config_file, exc_info=e) def _parse_dns_ip(self, address: str) -> ip_address: """Parse DNS address from systemd network configuration file. @@ -272,27 +295,30 @@ def _parse_dns_ip(self, address: str) -> ip_address: """ match = self.dns_ip_patttern.search(address) - if match: - return ip_address(match.group(1) or match.group(2)) - else: + if not match: raise ValueError(f"Invalid DNS address format: {address}") - def _parse_dhcp(self, value: str | None) -> tuple[bool, bool]: - """Parse DHCP value from systemd network configuration file to a boolean tuple (ipv4, ipv6).""" + return ip_address(match.group("withoutBrackets") or match.group("withBrackets")) + + def _parse_dhcp(self, value: str | None) -> DhcpConfig: + """Parse DHCP value from systemd network configuration file to a named tuple (ipv4, ipv6).""" if value is None or value == "no": - return False, False + return self.DhcpConfig(ipv4=False, ipv6=False) elif value == "yes": - return True, True + return self.DhcpConfig(ipv4=True, ipv6=True) elif value == "ipv4": - return True, False + return self.DhcpConfig(ipv4=True, ipv6=False) elif value == "ipv6": - return False, True - else: - raise ValueError(f"Invalid DHCP value: {value}") + return self.DhcpConfig(ipv4=False, ipv6=True) + + raise ValueError(f"Invalid DHCP value: {value}") def _parse_gateway(self, value: str | None) -> ip_address | None: - return None if not value or value in {"_dhcp4", "_ipv6ra"} else ip_address(value) + if (not value) or (value in {"_dhcp4", "_ipv6ra"}): + return None + + return ip_address(value) MANAGERS = [NetworkManagerConfigParser, SystemdNetworkConfigParser] diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv4.network b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv4.network new file mode 100644 index 000000000..292308719 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv4.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7395907b756bd08ba172bece9ce48c144a142999d0956b5e4dfe436e5119d484 +size 51 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv6.network b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv6.network new file mode 100644 index 000000000..51708ccd9 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/40-wireless-ipv6.network @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:62a93e614178f653ccf928d2ca658888e029e67beff30ccb0efc88e3df8ac84d +size 51 diff --git a/tests/helpers/test_utils.py b/tests/helpers/test_utils.py index 903abf9c7..8011fda19 100644 --- a/tests/helpers/test_utils.py +++ b/tests/helpers/test_utils.py @@ -8,11 +8,23 @@ from dissect.target.helpers import fsutil, utils -def test_slugify(): +def test_to_list_single_value() -> None: + assert utils.to_list(1) == [1] + assert utils.to_list("a") == ["a"] + assert utils.to_list(None) == [None] + + +def test_to_list_list_value() -> None: + assert utils.to_list([1, 2, 3]) == [1, 2, 3] + assert utils.to_list(["a", "b", "c"]) == ["a", "b", "c"] + assert utils.to_list([]) == [] + + +def test_slugify() -> None: assert utils.slugify("foo/bar\\baz bla") == "foo_bar_baz_bla" -def test_filesystem_readinto(): +def test_filesystem_readinto() -> None: data = b"hello_world" mocked_file = mock_open(read_data=b"hello_world") @@ -22,7 +34,7 @@ def test_filesystem_readinto(): assert len(buffer) == 512 -def test_helpers_fsutil_year_rollover_helper(): +def test_helpers_fsutil_year_rollover_helper() -> None: vfs = VirtualFilesystem() content = """ diff --git a/tests/plugins/os/unix/linux/test_network.py b/tests/plugins/os/unix/linux/test_network.py index f40aba5ed..4be79abd4 100644 --- a/tests/plugins/os/unix/linux/test_network.py +++ b/tests/plugins/os/unix/linux/test_network.py @@ -1,4 +1,3 @@ -import os import posixpath from datetime import datetime from ipaddress import ip_address, ip_network @@ -9,7 +8,7 @@ from dissect.target.filesystem import VirtualFilesystem from dissect.target.plugins.general.network import UnixInterfaceRecord from dissect.target.plugins.os.unix.linux.network import ( - LinuxConfigParser, + LinuxNetworkConfigParser, LinuxNetworkPlugin, NetworkManagerConfigParser, SystemdNetworkConfigParser, @@ -18,20 +17,7 @@ def test_networkmanager_parser(target_linux: Target, fs_linux: VirtualFilesystem) -> None: fixture_dir = "tests/_data/plugins/os/unix/linux/NetworkManager/" - fs_linux.makedirs("/etc/NetworkManager/system-connections") - - fs_linux.map_file( - "/etc/NetworkManager/system-connections/wired-static.nmconnection", - os.path.join(fixture_dir, "wired-static.nmconnection"), - ) - fs_linux.map_file( - "/etc/NetworkManager/system-connections/vlan.nmconnection", - os.path.join(fixture_dir, "vlan.nmconnection"), - ) - fs_linux.map_file( - "/etc/NetworkManager/system-connections/wireless.nmconnection", - os.path.join(fixture_dir, "wireless.nmconnection"), - ) + fs_linux.map_dir("/etc/NetworkManager/system-connections", fixture_dir) network_manager_config_parser = NetworkManagerConfigParser(target_linux) interfaces = list(network_manager_config_parser.interfaces()) @@ -86,14 +72,20 @@ def test_systemd_network_parser(target_linux: Target, fs_linux: VirtualFilesyste fs_linux.map_file( "/usr/lib/systemd/network/40-wireless.network", posixpath.join(fixture_dir, "40-wireless.network") ) + fs_linux.map_file( + "/run/systemd/network/40-wireless-ipv4.network", posixpath.join(fixture_dir, "40-wireless-ipv4.network") + ) + fs_linux.map_file( + "/usr/local/lib/systemd/network/40-wireless-ipv6.network", + posixpath.join(fixture_dir, "40-wireless-ipv6.network"), + ) fs_linux.map_file("/etc/systemd/network/20-vlan.netdev", posixpath.join(fixture_dir, "20-vlan.netdev")) systemd_network_config_parser = SystemdNetworkConfigParser(target_linux) interfaces = list(systemd_network_config_parser.interfaces()) - assert len(interfaces) == 3 - - wired_static, wired_static_complex, wireless = interfaces + assert len(interfaces) == 5 + wired_static, wired_static_complex, wireless, wireless_ipv4, wireless_ipv6 = interfaces assert wired_static.name == "enp1s0" assert wired_static.type is None @@ -146,14 +138,22 @@ def test_systemd_network_parser(target_linux: Target, fs_linux: VirtualFilesyste assert wireless.source == "/usr/lib/systemd/network/40-wireless.network" assert wired_static_complex.configurator == "systemd-networkd" + assert wireless_ipv4.dhcp_ipv4 + assert not wireless_ipv4.dhcp_ipv6 + assert wireless_ipv4.source == "/run/systemd/network/40-wireless-ipv4.network" + + assert not wireless_ipv6.dhcp_ipv4 + assert wireless_ipv6.dhcp_ipv6 + assert wireless_ipv6.source == "/usr/local/lib/systemd/network/40-wireless-ipv6.network" + def test_linux_network_plugin_interfaces(target_linux: Target, fs_linux: VirtualFilesystem) -> None: """Assert that the LinuxNetworkPlugin aggregates from all Config Parsers.""" - MockLinuxConfigParser1: LinuxConfigParser = MagicMock() + MockLinuxConfigParser1: LinuxNetworkConfigParser = MagicMock() MockLinuxConfigParser1.return_value.interfaces.return_value = [] - MockLinuxConfigParser2: LinuxConfigParser = MagicMock() + MockLinuxConfigParser2: LinuxNetworkConfigParser = MagicMock() MockLinuxConfigParser2.return_value.interfaces.return_value = [UnixInterfaceRecord()] with patch( From 336f2df2cd702d6c2737d1e77a3299f26786b72f Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:34:07 +0100 Subject: [PATCH 8/9] fix: edge case multiple vlans bound to same iface --- .../target/plugins/os/unix/linux/network.py | 55 ++++++++++--------- .../linux/NetworkManager/vlan2.nmconnection | 3 + .../linux/systemd.network/20-vlan2.netdev | 3 + tests/plugins/os/unix/linux/test_network.py | 7 ++- 4 files changed, 40 insertions(+), 28 deletions(-) create mode 100644 tests/_data/plugins/os/unix/linux/NetworkManager/vlan2.nmconnection create mode 100644 tests/_data/plugins/os/unix/linux/systemd.network/20-vlan2.netdev diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index dd4d7007f..dfcc42d46 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -32,7 +32,7 @@ def __init__(self, target: Target): self._target = target def _config_files(self, config_paths: list[str], glob: str) -> list[TargetPath]: - """Yield all configuration files in config_paths matching the given extension.""" + """Returns all configuration files in config_paths matching the given extension.""" all_files = [] for config_path in config_paths: paths = self._target.fs.path(config_path).glob(glob) @@ -124,10 +124,12 @@ def interfaces(self) -> Iterator[UnixInterfaceRecord]: self._target.log.warning("Error parsing network config file %s: %s", connection_file_path, e) for connection in connections: - uuid = vlan_id_by_interface.get(context.uuid) if context.uuid else None - name = vlan_id_by_interface.get(connection.name) if connection.name else None - if vlan_ids := (name or uuid): - connection.vlan.update(vlan_ids) + vlan_ids_from_interface = vlan_id_by_interface.get(connection.name, set()) + connection.vlan.update(vlan_ids_from_interface) + + vlan_ids_from_uuid = vlan_id_by_interface.get(connection.uuid, set()) + connection.vlan.update(vlan_ids_from_uuid) + yield connection.to_record() def _parse_route(self, route: str) -> ip_address | None: @@ -147,29 +149,32 @@ def _parse_lastconnected(self, last_connected: str) -> datetime | None: def _parse_ip_section_key( self, key: str, value: str, context: ParserContext, ip_version: Literal["ipv4", "ipv6"] ) -> None: - # nmcli inserts a trailling semicolon - if key == "dns" and (stripped := value.rstrip(";")): - context.dns.update({ip_address(addr) for addr in stripped.split(";")}) - elif key.startswith("address") and (trimmed := value.strip()): + if not (trimmed := value.strip()): + return + + if key == "dns": + context.dns.update({ip_address(addr) for addr in trimmed.split(";") if addr}) + elif key.startswith("address"): # Undocumented: single gateway on address line. Observed when running: # nmcli connection add type ethernet ... ip4 192.168.2.138/24 gw4 192.168.2.1 ip, *gateway = trimmed.split(",", 1) context.ip_interfaces.add(ip_interface(ip)) if gateway: context.gateways.add(ip_address(gateway[0])) - elif key.startswith("gateway") and value: - context.gateways.add(ip_address(value)) - elif key == "method" and ip_version == "ipv4": - context.dhcp_ipv4 = value == "auto" - elif key == "method" and ip_version == "ipv6": - context.dhcp_ipv6 = value == "auto" + elif key.startswith("gateway"): + context.gateways.add(ip_address(trimmed)) + elif key == "method": + if ip_version == "ipv4": + context.dhcp_ipv4 = trimmed == "auto" + elif ip_version == "ipv6": + context.dhcp_ipv6 = trimmed == "auto" elif key.startswith("route"): if gateway := self._parse_route(value): context.gateways.add(gateway) def _parse_vlan(self, sub_type: dict["str", any], vlan_id_by_interface: VlanIdByInterface) -> None: - parent_interface = sub_type.get("parent", None) - vlan_id = sub_type.get("id", None) + parent_interface = sub_type.get("parent") + vlan_id = sub_type.get("id") if not parent_interface or not vlan_id: return @@ -220,7 +225,8 @@ def _parse_virtual_networks(self) -> VlanIdByInterface: vlan_id = virtual_network_config.get("VLAN", {}).get("Id") if (name := net_dev_section.get("Name")) and vlan_id: - virtual_networks[name] = int(vlan_id) + vlan_ids = virtual_networks.setdefault(name, set()) + vlan_ids.add(int(vlan_id)) except Exception as e: self._target.log.warning("Error parsing virtual network config file %s", config_file, exc_info=e) @@ -255,15 +261,14 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI gateway_value = to_list(network_section.get("Gateway", [])) gateways.update({ip_address(gateway) for gateway in gateway_value}) - vlan_names = network_section.get("VLAN", []) - vlan_ids = { - vlan_id - for vlan_name in to_list(vlan_names) - if (vlan_id := virtual_networks.get(vlan_name)) is not None - } + vlan_ids: set[int] = set() + vlan_names = to_list(network_section.get("VLAN", [])) + for vlan_name in vlan_names: + if ids := virtual_networks.get(vlan_name): + vlan_ids.update(ids) # There are possibly multiple route sections, but they are collapsed into one by the parser. - route_section = config.get("Route", {}) + route_section: dict[str, any] = config.get("Route", {}) gateway_values = to_list(route_section.get("Gateway", [])) gateways.update(filter(None, map(self._parse_gateway, gateway_values))) diff --git a/tests/_data/plugins/os/unix/linux/NetworkManager/vlan2.nmconnection b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan2.nmconnection new file mode 100644 index 000000000..fa6cf64b8 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/NetworkManager/vlan2.nmconnection @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4b0ea3831e424cb610acd39ef1662f9a63d625b90fc962558d9cb31df9af7d00 +size 231 diff --git a/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan2.netdev b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan2.netdev new file mode 100644 index 000000000..bdc099e64 --- /dev/null +++ b/tests/_data/plugins/os/unix/linux/systemd.network/20-vlan2.netdev @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:38c91d7d3f1f55765d6da2bb76863f8f5be659f28c3e1204a9a17df683acdc58 +size 48 diff --git a/tests/plugins/os/unix/linux/test_network.py b/tests/plugins/os/unix/linux/test_network.py index 4be79abd4..e29f0c3a1 100644 --- a/tests/plugins/os/unix/linux/test_network.py +++ b/tests/plugins/os/unix/linux/test_network.py @@ -38,7 +38,7 @@ def test_networkmanager_parser(target_linux: Target, fs_linux: VirtualFilesystem assert not wired.dhcp_ipv6 assert wired.enabled is None assert wired.last_connected == datetime.fromisoformat("2024-10-29 07:59:54+00:00") - assert wired.vlan == [10] + assert Counter(wired.vlan) == Counter([10, 11]) assert wired.source == "/etc/NetworkManager/system-connections/wired-static.nmconnection" assert wired.configurator == "NetworkManager" @@ -80,6 +80,7 @@ def test_systemd_network_parser(target_linux: Target, fs_linux: VirtualFilesyste posixpath.join(fixture_dir, "40-wireless-ipv6.network"), ) fs_linux.map_file("/etc/systemd/network/20-vlan.netdev", posixpath.join(fixture_dir, "20-vlan.netdev")) + fs_linux.map_file("/etc/systemd/network/20-vlan2.netdev", posixpath.join(fixture_dir, "20-vlan2.netdev")) systemd_network_config_parser = SystemdNetworkConfigParser(target_linux) interfaces = list(systemd_network_config_parser.interfaces()) @@ -98,7 +99,7 @@ def test_systemd_network_parser(target_linux: Target, fs_linux: VirtualFilesyste assert not wired_static.dhcp_ipv6 assert wired_static.enabled is None assert wired_static.last_connected is None - assert wired_static.vlan == [100] + assert Counter(wired_static.vlan) == Counter([100, 101]) assert wired_static.source == "/etc/systemd/network/20-wired-static.network" assert wired_static.configurator == "systemd-networkd" @@ -147,7 +148,7 @@ def test_systemd_network_parser(target_linux: Target, fs_linux: VirtualFilesyste assert wireless_ipv6.source == "/usr/local/lib/systemd/network/40-wireless-ipv6.network" -def test_linux_network_plugin_interfaces(target_linux: Target, fs_linux: VirtualFilesystem) -> None: +def test_linux_network_plugin_interfaces(target_linux: Target) -> None: """Assert that the LinuxNetworkPlugin aggregates from all Config Parsers.""" MockLinuxConfigParser1: LinuxNetworkConfigParser = MagicMock() From 608f8c3fa39fe1e21b27f8dac107b0303157a2fd Mon Sep 17 00:00:00 2001 From: Roel de Jong <12800443+twiggler@users.noreply.github.com> Date: Wed, 20 Nov 2024 09:39:54 +0100 Subject: [PATCH 9/9] Fix type annotation and logging --- .../target/plugins/os/unix/linux/network.py | 63 +++++++++++-------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/dissect/target/plugins/os/unix/linux/network.py b/dissect/target/plugins/os/unix/linux/network.py index dfcc42d46..a4e55d296 100644 --- a/dissect/target/plugins/os/unix/linux/network.py +++ b/dissect/target/plugins/os/unix/linux/network.py @@ -4,14 +4,21 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from ipaddress import ip_address, ip_interface -from typing import Iterator, Literal, NamedTuple +from typing import TYPE_CHECKING, Any, Iterator, Literal, NamedTuple -from dissect.target import Target from dissect.target.helpers import configutil from dissect.target.helpers.record import UnixInterfaceRecord from dissect.target.helpers.utils import to_list from dissect.target.plugins.general.network import NetworkPlugin -from dissect.target.target import TargetPath + +if TYPE_CHECKING: + from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface + + from dissect.target import Target + from dissect.target.target import TargetPath + + NetAddress = IPv4Address | IPv6Address + NetInterface = IPv4Interface | IPv6Interface class LinuxNetworkPlugin(NetworkPlugin): @@ -67,9 +74,9 @@ class ParserContext: name: str | None = None mac_address: str | None = None type: str = "" - dns: set[ip_address] = field(default_factory=set) - ip_interfaces: set[ip_interface] = field(default_factory=set) - gateways: set[ip_address] = field(default_factory=set) + dns: set[NetAddress] = field(default_factory=set) + ip_interfaces: set[NetInterface] = field(default_factory=set) + gateways: set[NetAddress] = field(default_factory=set) dhcp_ipv4: bool = False dhcp_ipv6: bool = False vlan: set[int] = field(default_factory=set) @@ -121,7 +128,8 @@ def interfaces(self) -> Iterator[UnixInterfaceRecord]: connections.append(context) except Exception as e: - self._target.log.warning("Error parsing network config file %s: %s", connection_file_path, e) + self._target.log.warning("Error parsing network config file %s", connection_file_path) + self._target.log.debug("", exc_info=e) for connection in connections: vlan_ids_from_interface = vlan_id_by_interface.get(connection.name, set()) @@ -132,7 +140,7 @@ def interfaces(self) -> Iterator[UnixInterfaceRecord]: yield connection.to_record() - def _parse_route(self, route: str) -> ip_address | None: + def _parse_route(self, route: str) -> NetAddress | None: """Parse a route and return gateway IP address.""" if (elements := route.split(",")) and len(elements) > 1: return ip_address(elements[1]) @@ -153,7 +161,7 @@ def _parse_ip_section_key( return if key == "dns": - context.dns.update({ip_address(addr) for addr in trimmed.split(";") if addr}) + context.dns.update(ip_address(addr) for addr in trimmed.split(";") if addr) elif key.startswith("address"): # Undocumented: single gateway on address line. Observed when running: # nmcli connection add type ethernet ... ip4 192.168.2.138/24 gw4 192.168.2.1 @@ -172,7 +180,7 @@ def _parse_ip_section_key( if gateway := self._parse_route(value): context.gateways.add(gateway) - def _parse_vlan(self, sub_type: dict["str", any], vlan_id_by_interface: VlanIdByInterface) -> None: + def _parse_vlan(self, sub_type: dict[str, Any], vlan_id_by_interface: VlanIdByInterface) -> None: parent_interface = sub_type.get("parent") vlan_id = sub_type.get("id") if not parent_interface or not vlan_id: @@ -228,7 +236,8 @@ def _parse_virtual_networks(self) -> VlanIdByInterface: vlan_ids = virtual_networks.setdefault(name, set()) vlan_ids.add(int(vlan_id)) except Exception as e: - self._target.log.warning("Error parsing virtual network config file %s", config_file, exc_info=e) + self._target.log.warning("Error parsing virtual network config file %s", config_file) + self._target.log.debug("", exc_info=e) return virtual_networks @@ -242,9 +251,9 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI network_section: dict[str, str] = config.get("Network", {}) link_section: dict[str, str] = config.get("Link", {}) - ip_interfaces: set[ip_interface] = set() - gateways: set[ip_address] = set() - dns: set[ip_address] = set() + ip_interfaces: set[NetInterface] = set() + gateways: set[NetAddress] = set() + dns: set[NetAddress] = set() mac_addresses: set[str] = set() if link_mac := link_section.get("MACAddress"): @@ -253,13 +262,13 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI mac_addresses.update(match_section.get("PermanentMACAddress", "").split()) dns_value = to_list(network_section.get("DNS", [])) - dns.update({self._parse_dns_ip(dns_ip) for dns_ip in dns_value}) + dns.update(self._parse_dns_ip(dns_ip) for dns_ip in dns_value) address_value = to_list(network_section.get("Address", [])) - ip_interfaces.update({ip_interface(addr) for addr in address_value}) + ip_interfaces.update(ip_interface(addr) for addr in address_value) gateway_value = to_list(network_section.get("Gateway", [])) - gateways.update({ip_address(gateway) for gateway in gateway_value}) + gateways.update(ip_address(gateway) for gateway in gateway_value) vlan_ids: set[int] = set() vlan_names = to_list(network_section.get("VLAN", [])) @@ -268,7 +277,7 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI vlan_ids.update(ids) # There are possibly multiple route sections, but they are collapsed into one by the parser. - route_section: dict[str, any] = config.get("Route", {}) + route_section: dict[str, Any] = config.get("Route", {}) gateway_values = to_list(route_section.get("Gateway", [])) gateways.update(filter(None, map(self._parse_gateway, gateway_values))) @@ -276,11 +285,11 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI yield UnixInterfaceRecord( source=str(config_file), - type=match_section.get("Type", None), + type=match_section.get("Type"), enabled=None, # Unknown, dependent on run-time state dhcp_ipv4=dhcp_ipv4, dhcp_ipv6=dhcp_ipv6, - name=match_section.get("Name", None), + name=match_section.get("Name"), dns=list(dns), mac=list(mac_addresses), ip=[interface.ip for interface in ip_interfaces], @@ -290,20 +299,20 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI configurator="systemd-networkd", ) except Exception as e: - self._target.log.warning("Error parsing network config file %s", config_file, exc_info=e) + self._target.log.warning("Error parsing network config file %s", config_file) + self._target.log.debug("", exc_info=e) - def _parse_dns_ip(self, address: str) -> ip_address: + def _parse_dns_ip(self, address: str) -> NetAddress: """Parse DNS address from systemd network configuration file. The optional brackets and port number make this hard to parse. See https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html and search for DNS. """ - match = self.dns_ip_patttern.search(address) - if not match: - raise ValueError(f"Invalid DNS address format: {address}") + if match := self.dns_ip_patttern.search(address): + return ip_address(match.group("withoutBrackets") or match.group("withBrackets")) - return ip_address(match.group("withoutBrackets") or match.group("withBrackets")) + raise ValueError(f"Invalid DNS address format: {address}") def _parse_dhcp(self, value: str | None) -> DhcpConfig: """Parse DHCP value from systemd network configuration file to a named tuple (ipv4, ipv6).""" @@ -319,7 +328,7 @@ def _parse_dhcp(self, value: str | None) -> DhcpConfig: raise ValueError(f"Invalid DHCP value: {value}") - def _parse_gateway(self, value: str | None) -> ip_address | None: + def _parse_gateway(self, value: str | None) -> NetAddress | None: if (not value) or (value in {"_dhcp4", "_ipv6ra"}): return None