Coverage for linuxpy/proc.py: 55%
99 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 08:23 +0200
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 08:23 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7from pathlib import Path
9from linuxpy.util import try_numeric
11PROC_PATH = Path("/proc")
13CPU_INFO_PATH: Path = PROC_PATH / "cpuinfo"
14MEM_INFO_PATH: Path = PROC_PATH / "meminfo"
15MODULES_PATH: Path = PROC_PATH / "modules"
16STAT_PATH: Path = PROC_PATH / "stat"
17NET_PATH: Path = PROC_PATH / "net"
18DEV_PATH: Path = NET_PATH / "dev"
19WIRELESS_PATH: Path = NET_PATH / "wireless"
20NETSTAT_PATH = NET_PATH / "netstat"
21SNMP_PATH = NET_PATH / "snmp"
24def _iter_read_kv(path: Path):
25 with path.open() as fobj:
26 lines = fobj.readlines()
27 for keys, values in zip(lines[::2], lines[1::2], strict=True):
28 key, *keys = keys.split()
29 value, *values = values.split()
30 assert key == value
31 yield key.rstrip(":"), dict(zip(keys, [int(value) for value in values], strict=True))
34def iter_cpu_info():
35 """
36 Iterate over CPU info. Each item represents the information about one of
37 the processors in the system.
38 """
39 data = CPU_INFO_PATH.read_text()
40 for cpu in data.split("\n\n"):
41 info = {}
42 for line in cpu.splitlines():
43 key, value = map(str.strip, line.split(":", 1))
44 if "flags" in key or key == "bugs":
45 value = value.split()
46 else:
47 value = try_numeric(value)
48 info[key] = value
49 yield info
52def cpu_info():
53 """
54 CPU info as a sequence of dictionaries, each with information about one of
55 the system processors.
56 """
57 return tuple(iter_cpu_info())
60def iter_mem_info():
61 """
62 Iterate over the system memory information. Each item is a pair of field name and field value.
63 """
64 data = MEM_INFO_PATH.read_text()
65 for line in data.splitlines():
66 key, value = map(str.strip, line.split(":", 1))
67 if value.endswith(" kB"):
68 value = try_numeric(value[:-3]) * 1024
69 else:
70 value = try_numeric(value)
71 yield key, value
74def mem_info():
75 """
76 System memory information.
77 """
78 return dict(iter_mem_info())
81def iter_modules():
82 """
83 Iterate over system modules. Each item represents the information about one of
84 the modules in the system.
85 """
86 data = MODULES_PATH.read_text()
87 for line in data.splitlines():
88 fields = line.split()
89 mod = {
90 "name": fields[0],
91 "size": int(fields[1]),
92 "use_count": int(fields[2]),
93 "dependencies": [] if fields[3] == "-" else [dep for dep in fields[3].split(",") if dep],
94 }
95 if len(fields) > 5:
96 mod["state"] = fields[4]
97 mod["offset"] = int(fields[5], 16)
98 yield mod
101def modules():
102 """
103 Modules info as a sequence of dictionaries, each with information about one of
104 the system modules.
105 """
106 return tuple(iter_modules())
109def iter_stat():
110 """
111 Iterate over the system stats information. Each item is a pair of field name and field value.
112 """
113 CPU = "user", "nice", "system", "idle", "iowait", "irq", "softirq", "steal", "guest", "guest_nice"
114 data = STAT_PATH.read_text()
115 for line in data.splitlines():
116 name, *fields = line.split()
117 if name.startswith("cpu"):
118 payload = dict(zip(CPU, map(int, fields), strict=True))
119 elif name in {"intr", "softirq"}:
120 total, *fields = (int(field) for field in fields)
121 payload = dict(enumerate(fields, start=1))
122 payload["total"] = total
123 elif name in {"ctxt", "btime", "processes", "procs_running", "procs_blocked"}:
124 payload = int(fields[0])
125 else:
126 continue
127 yield name, payload
130def stat():
131 """
132 System stats information.
133 """
134 return dict(iter_stat())
137def iter_dev():
138 """
139 Iterate over network devices. Each item represents the information about one of
140 the network devices in the system.
141 """
142 with DEV_PATH.open() as fobj:
143 lines = fobj.readlines()
144 # Skip the header lines (usually first 2 lines)
145 for line in lines[2:]:
146 fields = line.strip().split()
147 if not fields:
148 continue
149 yield {
150 "interface": fields[0].rstrip(":"),
151 "receive": {
152 "bytes": int(fields[1]),
153 "packets": int(fields[2]),
154 "errs": int(fields[3]),
155 "drop": int(fields[4]),
156 "fifo": int(fields[5]),
157 "frame": int(fields[6]),
158 "compressed": int(fields[7]),
159 "multicast": int(fields[8]),
160 },
161 "transmit": {
162 "bytes": int(fields[9]),
163 "packets": int(fields[10]),
164 "errs": int(fields[11]),
165 "drop": int(fields[12]),
166 "fifo": int(fields[13]),
167 "colls": int(fields[14]),
168 "carrier": int(fields[15]),
169 "compressed": int(fields[16]),
170 },
171 }
174def dev():
175 """
176 Network devices info as a sequence of dictionaries, each with information about one of
177 the system network devices.
178 """
179 return tuple(iter_dev())
182def iter_wireless():
183 """
184 Iterate over wireless network devices. Each item represents the information about one of
185 the wireless network devices in the system.
186 """
187 with WIRELESS_PATH.open() as fobj:
188 lines = fobj.readlines()
189 # Skip the header lines (usually first 2 lines)
190 for line in lines[2:]:
191 fields = line.strip().split()
192 if not fields:
193 continue
194 yield {
195 "interface": fields[0].rstrip(":"),
196 "status": int(fields[1], 16),
197 "quality": {
198 "link": int(fields[2].rstrip(".")),
199 "level": int(fields[3].rstrip(".")),
200 "noise": int(fields[4].rstrip(".")),
201 },
202 "discarded": {
203 "nwid": int(fields[5]),
204 "crypt": int(fields[6]),
205 "misc": int(fields[7]),
206 },
207 }
210def wireless():
211 """
212 Wireless netowrk devices info as a sequence of dictionaries, each with information about one of
213 the system wireless network devices.
214 """
215 return tuple(iter_wireless())
218def iter_netstat():
219 """
220 Iterate over network statistics.
221 """
222 return _iter_read_kv(NETSTAT_PATH)
225def netstat():
226 """
227 Network statistics.
228 """
229 return dict(iter_netstat())
232def iter_snmp():
233 """
234 Iterate over SNMP statistics.
235 """
236 return _iter_read_kv(SNMP_PATH)
239def snmp():
240 """
241 SNMP statistics.
242 """
243 return dict(iter_snmp())