From e13bc120b709c93ef7f6a4db6db3e7e5e118936d Mon Sep 17 00:00:00 2001 From: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> Date: Mon, 25 Mar 2024 11:28:23 +0100 Subject: [PATCH 1/3] Change socket tests to use NetSocket plugin --- .../target/plugins/os/unix/linux/sockets.py | 4 +- tests/plugins/os/unix/linux/test_sockets.py | 160 ++++++------------ 2 files changed, 54 insertions(+), 110 deletions(-) diff --git a/dissect/target/plugins/os/unix/linux/sockets.py b/dissect/target/plugins/os/unix/linux/sockets.py index b0e582a51..ebcc47e3e 100644 --- a/dissect/target/plugins/os/unix/linux/sockets.py +++ b/dissect/target/plugins/os/unix/linux/sockets.py @@ -17,9 +17,9 @@ ("uint32", "rx_queue"), ("uint32", "tx_queue"), ("string", "local_ip"), - ("string", "local_port"), + ("uint16", "local_port"), ("string", "remote_ip"), - ("string", "remote_port"), + ("uint16", "remote_port"), ("string", "state"), ("string", "owner"), ("uint32", "inode"), diff --git a/tests/plugins/os/unix/linux/test_sockets.py b/tests/plugins/os/unix/linux/test_sockets.py index 84282c053..edefa9430 100644 --- a/tests/plugins/os/unix/linux/test_sockets.py +++ b/tests/plugins/os/unix/linux/test_sockets.py @@ -11,155 +11,102 @@ def test_sockets_plugin(target_linux_users: Target, fs_linux_proc_sockets: Virtu def test_tcp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.tcp()) - assert len(results) == 4 - assert results[0].local_ip.exploded == "0.0.0.0" + results = list(target_linux_users.sockets.tcp()) + + assert len(results) == 7 + assert results[0].local_ip == "0.0.0.0" assert results[0].local_port == 22 - assert results[0].remote_ip.exploded == "0.0.0.0" + assert results[0].remote_ip == "0.0.0.0" assert results[0].remote_port == 0 - assert results[2].local_ip.exploded == "127.0.0.1" + assert results[2].local_ip == "127.0.0.1" assert results[2].local_port == 631 - assert results[2].remote_ip.exploded == "0.0.0.0" + assert results[2].remote_ip == "0.0.0.0" assert results[2].remote_port == 0 - assert results[3].local_ip.exploded == "172.16.64.136" + assert results[3].local_ip == "172.16.64.136" assert results[3].local_port == 22 - assert results[3].remote_ip.exploded == "172.16.64.1" + assert results[3].remote_ip == "172.16.64.1" assert results[3].remote_port == 49442 - for result in results: - assert result.protocol_string == "tcp" - assert result.owner == "root" - assert result.pid in (0, 2, 1337) - assert result.inode in (1337, 1338, 0) - assert result.cmdline in ("", None, "acquire -p full --proc") + assert results[4].local_ip == "::" + assert results[4].local_port == 22 + assert results[4].remote_ip == "::" + assert results[4].remote_port == 0 - -def test_tcp6(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.tcp6()) - - assert len(results) == 3 - assert results[0].local_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[0].local_port == 22 - assert results[0].remote_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[0].remote_port == 0 - - assert results[2].local_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0001" - assert results[2].local_port == 631 - assert results[2].remote_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[2].remote_port == 0 + assert results[6].local_ip == "::1" + assert results[6].local_port == 631 + assert results[6].remote_ip == "::" + assert results[6].remote_port == 0 for result in results: - assert result.protocol_string == "tcp6" + assert result.protocol in ("tcp", "tcp6") assert result.owner == "root" - assert result.pid in (0, 2, 1337) + assert result.pid in (0, 1, 2, 1337) assert result.inode in (1337, 1338, 0) - assert result.cmdline in ("", None, "acquire -p full --proc") + assert result.cmdline in ("", None, "acquire -p full --proc", "test cmdline") def test_udp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.udp()) + results = list(target_linux_users.sockets.udp()) - assert len(results) == 4 - assert results[0].local_ip.exploded == "172.16.64.136" + assert len(results) == 7 + assert results[0].local_ip == "172.16.64.136" assert results[0].local_port == 68 - assert results[0].remote_ip.exploded == "172.16.64.254" + assert results[0].remote_ip == "172.16.64.254" assert results[0].remote_port == 67 - assert results[2].local_ip.exploded == "0.0.0.0" + assert results[2].local_ip == "0.0.0.0" assert results[2].local_port == 58569 - assert results[2].remote_ip.exploded == "0.0.0.0" + assert results[2].remote_ip == "0.0.0.0" assert results[2].remote_port == 0 - assert results[3].local_ip.exploded == "0.0.0.0" + assert results[3].local_ip == "0.0.0.0" assert results[3].local_port == 5353 - assert results[3].remote_ip.exploded == "0.0.0.0" + assert results[3].remote_ip == "0.0.0.0" assert results[3].remote_port == 0 - for result in results: - assert result.protocol_string == "udp" - assert result.owner in ("root", "110") - assert result.pid in (0, 2, 3, 1337) - assert result.inode in (1337, 1338, 1339, 0) - assert result.cmdline in ("", None, "acquire -p full --proc", "sshd") - - -def test_udp6(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.udp6()) - - assert len(results) == 3 - assert results[0].local_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[0].local_port == 59613 - assert results[0].remote_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[0].remote_port == 0 - - assert results[2].local_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[2].local_port == 5353 - assert results[2].remote_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[2].remote_port == 0 + assert results[4].local_ip == "::" + assert results[4].local_port == 59613 + assert results[4].remote_ip == "::" + assert results[4].remote_port == 0 for result in results: - assert result.protocol_string == "udp6" - assert result.owner == "110" - assert result.pid in (0, 2, 3, 1337) + assert result.protocol in ("udp", "udp6") + assert result.owner in ("root", "110") + assert result.pid in (0, 1, 2, 3, 1337) assert result.inode in (1337, 1338, 1339, 0) - assert result.cmdline in ("", None, "acquire -p full --proc", "sshd") + assert result.cmdline in ("", None, "acquire -p full --proc", "sshd", "test cmdline") def test_raw(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.raw()) + results = list(target_linux_users.sockets.raw()) - assert len(results) == 2 - assert results[0].local_ip.exploded == "0.0.0.0" + assert len(results) == 4 + assert results[0].local_ip == "0.0.0.0" assert results[0].local_port == 253 - assert results[0].remote_ip.exploded == "0.0.0.0" + assert results[0].remote_ip == "0.0.0.0" assert results[0].remote_port == 0 - assert results[1].local_ip.exploded == "0.0.0.0" + assert results[1].local_ip == "0.0.0.0" assert results[1].local_port == 253 - assert results[1].remote_ip.exploded == "0.0.0.0" - assert results[1].remote_port == 0 - - for result in results: - assert result.protocol_string == "raw" - assert result.owner == "root" - assert result.pid in (1, 1337) - assert result.inode == (1337) - assert result.cmdline == "acquire -p full --proc" - - -def test_raw6(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.raw6()) - - assert len(results) == 2 - assert results[0].local_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[0].local_port == 58 - assert results[0].remote_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[0].remote_port == 0 - - assert results[1].local_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" - assert results[1].local_port == 58 - assert results[1].remote_ip.exploded == "0000:0000:0000:0000:0000:0000:0000:0000" + assert results[1].remote_ip == "0.0.0.0" assert results[1].remote_port == 0 for result in results: - assert result.protocol_string == "raw6" + assert result.protocol in ("raw", "raw6") assert result.owner == "root" assert result.pid in (1, 1337) assert result.inode == (1337) - assert result.cmdline == "acquire -p full --proc" + assert result.cmdline in ("acquire -p full --proc", "test cmdline") def test_packet(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.packet()) + results = list(target_linux_users.sockets.packet()) assert len(results) == 2 @@ -167,26 +114,23 @@ def test_packet(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesy assert result.ref == 3 assert result.type == 3 # ETH_P_ALL assert result.protocol_type == "ETH_P_ALL" - assert result.protocol_string == "packet" - assert result.cmdline == "acquire -p full --proc" - assert result.pid == 1337 + assert result.protocol == "packet" + assert result.cmdline in ("acquire -p full --proc", "test cmdline") + assert result.pid in (1, 1337) assert result.owner == "root" def test_unix(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): target_linux_users.add_plugin(ProcPlugin) - results = list(target_linux_users.proc.sockets.unix()) + results = list(target_linux_users.sockets.unix()) assert len(results) == 4 for result in results: - assert result.protocol_string == "unix" assert result.ref in (2, 3) - assert result.protocol == 0 - assert result.type == 1 - assert result.state in (1, 3) + assert result.protocol == "unix" + assert result.type == "STREAM" + assert result.state in ("LISTENING", "CONNECTED") assert result.flags in ("00010000", "00000000") - assert result.state_string in ("CONNECTED", "LISTENING") - assert result.stream_type_string == "STREAM" assert result.path in ( "/run/systemd/private", From e484d9108d6c6a62a7b5ff082753dc58ea679d50 Mon Sep 17 00:00:00 2001 From: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> Date: Mon, 25 Mar 2024 13:59:01 +0100 Subject: [PATCH 2/3] Swap ProcPlugin for NetSocketPlugin --- tests/plugins/os/unix/linux/test_sockets.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/plugins/os/unix/linux/test_sockets.py b/tests/plugins/os/unix/linux/test_sockets.py index edefa9430..e16fa4b7e 100644 --- a/tests/plugins/os/unix/linux/test_sockets.py +++ b/tests/plugins/os/unix/linux/test_sockets.py @@ -1,16 +1,16 @@ from dissect.target.filesystem import VirtualFilesystem -from dissect.target.plugins.os.unix.linux.proc import ProcPlugin +from dissect.target.plugins.os.unix.linux.sockets import NetSocketPlugin from dissect.target.target import Target def test_sockets_plugin(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) + target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets()) assert len(results) == 24 def test_tcp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) + target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.tcp()) @@ -49,7 +49,7 @@ def test_tcp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesyste def test_udp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) + target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.udp()) assert len(results) == 7 @@ -82,7 +82,7 @@ def test_udp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesyste def test_raw(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) + target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.raw()) assert len(results) == 4 @@ -105,7 +105,7 @@ def test_raw(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesyste def test_packet(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) + target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.packet()) assert len(results) == 2 @@ -121,7 +121,7 @@ def test_packet(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesy def test_unix(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): - target_linux_users.add_plugin(ProcPlugin) + target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.unix()) assert len(results) == 4 From 5bb64a77c216dc86b0099ef4c5348aaeba4a83e3 Mon Sep 17 00:00:00 2001 From: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> Date: Mon, 25 Mar 2024 14:12:35 +0100 Subject: [PATCH 3/3] Explicitly add ProcPlugin and NetSocketPlugin --- tests/plugins/os/unix/linux/test_sockets.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/plugins/os/unix/linux/test_sockets.py b/tests/plugins/os/unix/linux/test_sockets.py index e16fa4b7e..8f8a768e5 100644 --- a/tests/plugins/os/unix/linux/test_sockets.py +++ b/tests/plugins/os/unix/linux/test_sockets.py @@ -1,15 +1,18 @@ from dissect.target.filesystem import VirtualFilesystem +from dissect.target.plugins.os.unix.linux.proc import ProcPlugin from dissect.target.plugins.os.unix.linux.sockets import NetSocketPlugin from dissect.target.target import Target def test_sockets_plugin(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): + target_linux_users.add_plugin(ProcPlugin) target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets()) assert len(results) == 24 def test_tcp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): + target_linux_users.add_plugin(ProcPlugin) target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.tcp()) @@ -49,6 +52,7 @@ def test_tcp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesyste def test_udp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): + target_linux_users.add_plugin(ProcPlugin) target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.udp()) @@ -82,6 +86,7 @@ def test_udp(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesyste def test_raw(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): + target_linux_users.add_plugin(ProcPlugin) target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.raw()) @@ -105,6 +110,7 @@ def test_raw(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesyste def test_packet(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): + target_linux_users.add_plugin(ProcPlugin) target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.packet()) @@ -121,6 +127,7 @@ def test_packet(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesy def test_unix(target_linux_users: Target, fs_linux_proc_sockets: VirtualFilesystem): + target_linux_users.add_plugin(ProcPlugin) target_linux_users.add_plugin(NetSocketPlugin) results = list(target_linux_users.sockets.unix())