Skip to content

model.netinfo_psutil

model.netinfo_psutil

ProcessInfo dataclass

Store process metadata for a socket record.

Source code in model/netinfo.py
 8
 9
10
11
12
13
14
15
16
@dataclass(frozen=True)
class ProcessInfo:
    """Store process metadata for a socket record."""

    status: str
    label: str
    name: str | None = None
    exe: str | None = None
    cmdline: list[str] | None = None

PsutilNetInfo

Collect socket records via psutil and attach process metadata (Windows).

Source code in model/netinfo_psutil.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class PsutilNetInfo:
    """Collect socket records via psutil and attach process metadata (Windows)."""

    KINDS = ("tcp", "udp")

    def __init__(self, allowed_statuses: set[str] | None = None) -> None:
        self.allowed_statuses = allowed_statuses

    def get_data(self) -> list[dict[str, Any]]:
        """Return connection records with socket and process fields."""
        proc_cache: dict[int, ProcessInfo] = {}
        results: list[dict[str, Any]] = []

        for proto in self.KINDS:
            try:
                conns = psutil.net_connections(kind=proto)
            except (psutil.AccessDenied, PermissionError):
                continue
            except (OSError, RuntimeError):
                continue

            for conn in conns:
                if not self._is_included(conn, proto=proto):
                    continue

                pid = conn.pid
                proc = self._process_info(pid, proc_cache)

                l_ip, l_port = self._split_addr(conn.laddr)
                r_ip, r_port = self._split_addr(conn.raddr)

                results.append(
                    {
                        "pid": pid,
                        "proto": proto,
                        "status": conn.status or "NONE",
                        "family": str(conn.family).replace("AddressFamily.", ""),
                        "type": str(conn.type).replace("SocketKind.", ""),
                        "laddr_ip": l_ip,
                        "laddr_port": l_port,
                        "raddr_ip": r_ip,
                        "raddr_port": r_port,
                        "process_status": proc.status,
                        "process_label": proc.label,
                        "process_name": proc.name,
                        "exe": proc.exe,
                        "cmdline": proc.cmdline,
                    }
                )

        return results

    def _is_included(self, conn: Any, *, proto: str) -> bool:
        """Return True if the connection is included in the snapshot."""
        status = conn.status or "NONE"
        return not (
            proto == "tcp"
            and self.allowed_statuses is not None
            and status not in self.allowed_statuses
        )

    def _process_info(
        self,
        pid: int | None,
        cache: dict[int, ProcessInfo],
    ) -> ProcessInfo:
        """Return process metadata for a PID."""
        if pid is None or pid <= 0:
            return ProcessInfo(status="No process", label="System")

        cached = cache.get(pid)
        if cached is not None:
            return cached

        try:
            proc = psutil.Process(pid)
        except psutil.NoSuchProcess:
            info = ProcessInfo(status="No process", label="System")
            cache[pid] = info
            return info
        except psutil.AccessDenied:
            info = ProcessInfo(status="Access denied", label="Access denied")
            cache[pid] = info
            return info
        except (psutil.ZombieProcess, OSError, RuntimeError):
            info = ProcessInfo(status="Unavailable", label="Unavailable")
            cache[pid] = info
            return info

        access_denied = False

        def read(fn: Callable[[], Any]) -> Any:
            nonlocal access_denied
            try:
                return fn()
            except psutil.AccessDenied:
                access_denied = True
                return None
            except psutil.NoSuchProcess:
                return None
            except (OSError, RuntimeError):
                return None

        with proc.oneshot():
            name = read(proc.name)
            exe = read(proc.exe)
            cmdline = read(proc.cmdline)

        if isinstance(name, str) and name:
            info = ProcessInfo(
                status="OK",
                label=name,
                name=name,
                exe=exe,
                cmdline=cmdline,
            )
        elif access_denied:
            info = ProcessInfo(status="Access denied", label="Access denied")
        else:
            info = ProcessInfo(status="Unavailable", label="Unavailable")

        cache[pid] = info
        return info

    @staticmethod
    def _split_addr(addr: Any) -> tuple[str | None, int | None]:
        """Return (ip, port) from a psutil address value."""
        if not addr:
            return None, None
        try:
            return addr.ip, addr.port
        except AttributeError:
            return addr[0], addr[1]

get_data()

Return connection records with socket and process fields.

Source code in model/netinfo_psutil.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_data(self) -> list[dict[str, Any]]:
    """Return connection records with socket and process fields."""
    proc_cache: dict[int, ProcessInfo] = {}
    results: list[dict[str, Any]] = []

    for proto in self.KINDS:
        try:
            conns = psutil.net_connections(kind=proto)
        except (psutil.AccessDenied, PermissionError):
            continue
        except (OSError, RuntimeError):
            continue

        for conn in conns:
            if not self._is_included(conn, proto=proto):
                continue

            pid = conn.pid
            proc = self._process_info(pid, proc_cache)

            l_ip, l_port = self._split_addr(conn.laddr)
            r_ip, r_port = self._split_addr(conn.raddr)

            results.append(
                {
                    "pid": pid,
                    "proto": proto,
                    "status": conn.status or "NONE",
                    "family": str(conn.family).replace("AddressFamily.", ""),
                    "type": str(conn.type).replace("SocketKind.", ""),
                    "laddr_ip": l_ip,
                    "laddr_port": l_port,
                    "raddr_ip": r_ip,
                    "raddr_port": r_port,
                    "process_status": proc.status,
                    "process_label": proc.label,
                    "process_name": proc.name,
                    "exe": proc.exe,
                    "cmdline": proc.cmdline,
                }
            )

    return results