Coverage for linuxpy/proc.py: 58%
112 statements
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7from pathlib import Path
9from linuxpy.util import try_numeric
11PROC_PATH = Path("/proc")
13CPU_INFO_PATH: Path = PROC_PATH / "cpuinfo"
14MEM_INFO_PATH: Path = PROC_PATH / "meminfo"
15MODULES_PATH: Path = PROC_PATH / "modules"
16STAT_PATH: Path = PROC_PATH / "stat"
17NET_PATH: Path = PROC_PATH / "net"
18NET_DEV_PATH: Path = NET_PATH / "dev"
19WIRELESS_PATH: Path = NET_PATH / "wireless"
20NETSTAT_PATH = NET_PATH / "netstat"
21SNMP_PATH = NET_PATH / "snmp"
24def _iter_read_kv(path: Path):
25 with path.open() as fobj:
26 lines = fobj.readlines()
27 for keys, values in zip(lines[::2], lines[1::2], strict=True):
28 key, *keys = keys.split()
29 value, *values = values.split()
30 assert key == value
31 yield key.rstrip(":"), dict(zip(keys, [int(value) for value in values], strict=True))
34def iter_cpu_info():
35 """
36 Iterate over CPU info. Each item represents the information about one of
37 the processors in the system.
38 """
39 data = CPU_INFO_PATH.read_text()
40 for cpu in data.split("\n\n"):
41 info = {}
42 for line in cpu.splitlines():
43 key, value = map(str.strip, line.split(":", 1))
44 if "flags" in key or key == "bugs":
45 value = value.split()
46 else:
47 value = try_numeric(value)
48 info[key] = value
49 yield info
52def cpu_info():
53 """
54 CPU info as a sequence of dictionaries, each with information about one of
55 the system processors.
56 """
57 return tuple(iter_cpu_info())
60def iter_mem_info():
61 """
62 Iterate over the system memory information. Each item is a pair of field name and field value.
63 """
64 data = MEM_INFO_PATH.read_text()
65 for line in data.splitlines():
66 key, value = map(str.strip, line.split(":", 1))
67 if value.endswith(" kB"):
68 value = try_numeric(value[:-3]) * 1024
69 else:
70 value = try_numeric(value)
71 yield key, value
74def mem_info():
75 """
76 System memory information.
77 """
78 return dict(iter_mem_info())
81def iter_modules():
82 """
83 Iterate over system modules. Each item represents the information about one of
84 the modules in the system.
85 """
86 data = MODULES_PATH.read_text()
87 for line in data.splitlines():
88 fields = line.split()
89 mod = {
90 "name": fields[0],
91 "size": int(fields[1]),
92 "use_count": int(fields[2]),
93 "dependencies": [] if fields[3] == "-" else [dep for dep in fields[3].split(",") if dep],
94 }
95 if len(fields) > 5:
96 mod["state"] = fields[4]
97 mod["offset"] = int(fields[5], 16)
98 yield mod
101def modules():
102 """
103 Modules info as a sequence of dictionaries, each with information about one of
104 the system modules.
105 """
106 return tuple(iter_modules())
109def iter_stat():
110 """
111 Iterate over the system stats information. Each item is a pair of field name and field value.
112 """
113 CPU = "user", "nice", "system", "idle", "iowait", "irq", "softirq", "steal", "guest", "guest_nice"
114 data = STAT_PATH.read_text()
115 for line in data.splitlines():
116 name, *fields = line.split()
117 if name.startswith("cpu"):
118 payload = dict(zip(CPU, map(int, fields), strict=True))
119 elif name in {"intr", "softirq"}:
120 total, *fields = (int(field) for field in fields)
121 payload = dict(enumerate(fields, start=1))
122 payload["total"] = total
123 elif name in {"ctxt", "btime", "processes", "procs_running", "procs_blocked"}:
124 payload = int(fields[0])
125 else:
126 continue
127 yield name, payload
130def stat():
131 """
132 System stats information.
133 """
134 return dict(iter_stat())
137class net:
138 @staticmethod
139 def iter_devices():
140 """
141 Iterate over network devices. Each item represents the information about one of
142 the network devices in the system.
143 """
144 with NET_DEV_PATH.open() as fobj:
145 lines = fobj.readlines()
146 # Skip the header lines (usually first 2 lines)
147 for line in lines[2:]:
148 fields = line.strip().split()
149 if not fields:
150 continue
151 yield {
152 "interface": fields[0].rstrip(":"),
153 "receive": {
154 "bytes": int(fields[1]),
155 "packets": int(fields[2]),
156 "errs": int(fields[3]),
157 "drop": int(fields[4]),
158 "fifo": int(fields[5]),
159 "frame": int(fields[6]),
160 "compressed": int(fields[7]),
161 "multicast": int(fields[8]),
162 },
163 "transmit": {
164 "bytes": int(fields[9]),
165 "packets": int(fields[10]),
166 "errs": int(fields[11]),
167 "drop": int(fields[12]),
168 "fifo": int(fields[13]),
169 "colls": int(fields[14]),
170 "carrier": int(fields[15]),
171 "compressed": int(fields[16]),
172 },
173 }
175 @staticmethod
176 def devices():
177 """
178 Network devices info as a sequence of dictionaries, each with information about one of
179 the system network devices.
180 """
181 return tuple(net.iter_devices())
183 @staticmethod
184 def iter_wireless():
185 """
186 Iterate over wireless network devices. Each item represents the information about one of
187 the wireless network devices in the system.
188 """
189 with WIRELESS_PATH.open() as fobj:
190 lines = fobj.readlines()
191 # Skip the header lines (usually first 2 lines)
192 for line in lines[2:]:
193 fields = line.strip().split()
194 if not fields:
195 continue
196 yield {
197 "interface": fields[0].rstrip(":"),
198 "status": int(fields[1], 16),
199 "quality": {
200 "link": int(fields[2].rstrip(".")),
201 "level": int(fields[3].rstrip(".")),
202 "noise": int(fields[4].rstrip(".")),
203 },
204 "discarded": {
205 "nwid": int(fields[5]),
206 "crypt": int(fields[6]),
207 "misc": int(fields[7]),
208 },
209 }
211 @staticmethod
212 def wireless():
213 """
214 Wireless netowrk devices info as a sequence of dictionaries, each with information about one of
215 the system wireless network devices.
216 """
217 return tuple(net.iter_wireless())
219 @staticmethod
220 def iter_netstat():
221 """
222 Iterate over network statistics.
223 """
224 return _iter_read_kv(NETSTAT_PATH)
226 @staticmethod
227 def netstat():
228 """
229 Network statistics.
230 """
231 return dict(net.iter_netstat())
233 @staticmethod
234 def iter_snmp():
235 """
236 Iterate over SNMP statistics.
237 """
238 return _iter_read_kv(SNMP_PATH)
240 @staticmethod
241 def snmp():
242 """
243 SNMP statistics.
244 """
245 return dict(net.iter_snmp())
248def iter_pids():
249 return (int(item.name) for item in PROC_PATH.iterdir() if item.name.isdigit())
252def pids():
253 return tuple(iter_pids())