Coverage for linuxpy/proc.py: 51%
99 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 17:42 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-06 17:42 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7from pathlib import Path
9from linuxpy.util import try_numeric
11PROC_PATH = Path("/proc")
13CPU_INFO_PATH: Path = PROC_PATH / "cpuinfo"
14MEM_INFO_PATH: Path = PROC_PATH / "meminfo"
15MODULES_PATH: Path = PROC_PATH / "modules"
16STAT_PATH: Path = PROC_PATH / "stat"
17NET_PATH: Path = PROC_PATH / "net"
18DEV_PATH: Path = NET_PATH / "dev"
19WIRELESS_PATH: Path = NET_PATH / "wireless"
20NETSTAT_PATH = NET_PATH / "netstat"
21SNMP_PATH = NET_PATH / "snmp"
24def _iter_read_kv(path: Path):
25 with path.open() as fobj:
26 lines = fobj.readlines()
27 for keys, values in zip(lines[::2], lines[1::2]):
28 key, *keys = keys.split()
29 value, *values = values.split()
30 assert key == value
31 yield key.rstrip(":"), dict(zip(keys, [int(value) for value in values]))
34def iter_cpu_info():
35 data = CPU_INFO_PATH.read_text()
36 for cpu in data.split("\n\n"):
37 info = {}
38 for line in cpu.splitlines():
39 key, value = map(str.strip, line.split(":", 1))
40 if "flags" in key or key == "bugs":
41 value = value.split()
42 else:
43 value = try_numeric(value)
44 info[key] = value
45 yield info
48def cpu_info():
49 return tuple(iter_cpu_info())
52def iter_mem_info():
53 data = MEM_INFO_PATH.read_text()
54 for line in data.splitlines():
55 key, value = map(str.strip, line.split(":", 1))
56 if value.endswith(" kB"):
57 value = try_numeric(value[:-3]) * 1024
58 else:
59 value = try_numeric(value)
60 yield key, value
63def mem_info():
64 return dict(iter_mem_info())
67def iter_modules():
68 data = MODULES_PATH.read_text()
69 for line in data.splitlines():
70 fields = line.split()
71 mod = {
72 "name": fields[0],
73 "size": int(fields[1]),
74 "use_count": int(fields[2]),
75 "dependencies": [] if fields[3] == "-" else [dep for dep in fields[3].split(",") if dep],
76 }
77 if len(fields) > 5: 77 ↛ 80line 77 didn't jump to line 80 because the condition on line 77 was always true
78 mod["state"] = fields[4]
79 mod["offset"] = int(fields[5], 16)
80 yield mod
83def modules():
84 return tuple(iter_modules())
87def iter_stat():
88 CPU = "user", "nice", "system", "idle", "iowait", "irq", "softirq", "steal", "guest", "guest_nice"
89 data = STAT_PATH.read_text()
90 for line in data.splitlines():
91 name, *fields = line.split()
92 if name.startswith("cpu"):
93 payload = dict(zip(CPU, map(int, fields)))
94 elif name in {"intr", "softirq"}:
95 total, *fields = (int(field) for field in fields)
96 payload = dict(enumerate(fields, start=1))
97 payload["total"] = total
98 elif name in {"ctxt", "btime", "processes", "procs_running", "procs_blocked"}:
99 payload = int(fields[0])
100 else:
101 continue
102 yield name, payload
105def stat():
106 return dict(iter_stat())
109def iter_dev():
110 with DEV_PATH.open() as fobj:
111 lines = fobj.readlines()
112 # Skip the header lines (usually first 2 lines)
113 for line in lines[2:]:
114 fields = line.strip().split()
115 if not fields:
116 continue
117 yield {
118 "interface": fields[0].rstrip(":"),
119 "receive": {
120 "bytes": int(fields[1]),
121 "packets": int(fields[2]),
122 "errs": int(fields[3]),
123 "drop": int(fields[4]),
124 "fifo": int(fields[5]),
125 "frame": int(fields[6]),
126 "compressed": int(fields[7]),
127 "multicast": int(fields[8]),
128 },
129 "transmit": {
130 "bytes": int(fields[9]),
131 "packets": int(fields[10]),
132 "errs": int(fields[11]),
133 "drop": int(fields[12]),
134 "fifo": int(fields[13]),
135 "colls": int(fields[14]),
136 "carrier": int(fields[15]),
137 "compressed": int(fields[16]),
138 },
139 }
142def dev():
143 return tuple(iter_dev())
146def iter_wireless():
147 with WIRELESS_PATH.open() as fobj:
148 lines = fobj.readlines()
149 # Skip the header lines (usually first 2 lines)
150 for line in lines[2:]:
151 fields = line.strip().split()
152 if not fields:
153 continue
154 yield {
155 "interface": fields[0].rstrip(":"),
156 "status": int(fields[1], 16),
157 "quality": {
158 "link": int(fields[2].rstrip(".")),
159 "level": int(fields[3].rstrip(".")),
160 "noise": int(fields[4].rstrip(".")),
161 },
162 "discarded": {
163 "nwid": int(fields[5]),
164 "crypt": int(fields[6]),
165 "misc": int(fields[7]),
166 },
167 }
170def wireless():
171 return tuple(iter_wireless())
174def iter_netstat():
175 return _iter_read_kv(NETSTAT_PATH)
178def netstat():
179 return dict(iter_netstat())
182def iter_snmp():
183 return _iter_read_kv(SNMP_PATH)
186def snmp():
187 return dict(iter_snmp())