Coverage for linuxpy/proc.py: 51%

99 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-06 17:42 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7from pathlib import Path 

8 

9from linuxpy.util import try_numeric 

10 

11PROC_PATH = Path("/proc") 

12 

13CPU_INFO_PATH: Path = PROC_PATH / "cpuinfo" 

14MEM_INFO_PATH: Path = PROC_PATH / "meminfo" 

15MODULES_PATH: Path = PROC_PATH / "modules" 

16STAT_PATH: Path = PROC_PATH / "stat" 

17NET_PATH: Path = PROC_PATH / "net" 

18DEV_PATH: Path = NET_PATH / "dev" 

19WIRELESS_PATH: Path = NET_PATH / "wireless" 

20NETSTAT_PATH = NET_PATH / "netstat" 

21SNMP_PATH = NET_PATH / "snmp" 

22 

23 

24def _iter_read_kv(path: Path): 

25 with path.open() as fobj: 

26 lines = fobj.readlines() 

27 for keys, values in zip(lines[::2], lines[1::2]): 

28 key, *keys = keys.split() 

29 value, *values = values.split() 

30 assert key == value 

31 yield key.rstrip(":"), dict(zip(keys, [int(value) for value in values])) 

32 

33 

34def iter_cpu_info(): 

35 data = CPU_INFO_PATH.read_text() 

36 for cpu in data.split("\n\n"): 

37 info = {} 

38 for line in cpu.splitlines(): 

39 key, value = map(str.strip, line.split(":", 1)) 

40 if "flags" in key or key == "bugs": 

41 value = value.split() 

42 else: 

43 value = try_numeric(value) 

44 info[key] = value 

45 yield info 

46 

47 

48def cpu_info(): 

49 return tuple(iter_cpu_info()) 

50 

51 

52def iter_mem_info(): 

53 data = MEM_INFO_PATH.read_text() 

54 for line in data.splitlines(): 

55 key, value = map(str.strip, line.split(":", 1)) 

56 if value.endswith(" kB"): 

57 value = try_numeric(value[:-3]) * 1024 

58 else: 

59 value = try_numeric(value) 

60 yield key, value 

61 

62 

63def mem_info(): 

64 return dict(iter_mem_info()) 

65 

66 

67def iter_modules(): 

68 data = MODULES_PATH.read_text() 

69 for line in data.splitlines(): 

70 fields = line.split() 

71 mod = { 

72 "name": fields[0], 

73 "size": int(fields[1]), 

74 "use_count": int(fields[2]), 

75 "dependencies": [] if fields[3] == "-" else [dep for dep in fields[3].split(",") if dep], 

76 } 

77 if len(fields) > 5: 77 ↛ 80line 77 didn't jump to line 80 because the condition on line 77 was always true

78 mod["state"] = fields[4] 

79 mod["offset"] = int(fields[5], 16) 

80 yield mod 

81 

82 

83def modules(): 

84 return tuple(iter_modules()) 

85 

86 

87def iter_stat(): 

88 CPU = "user", "nice", "system", "idle", "iowait", "irq", "softirq", "steal", "guest", "guest_nice" 

89 data = STAT_PATH.read_text() 

90 for line in data.splitlines(): 

91 name, *fields = line.split() 

92 if name.startswith("cpu"): 

93 payload = dict(zip(CPU, map(int, fields))) 

94 elif name in {"intr", "softirq"}: 

95 total, *fields = (int(field) for field in fields) 

96 payload = dict(enumerate(fields, start=1)) 

97 payload["total"] = total 

98 elif name in {"ctxt", "btime", "processes", "procs_running", "procs_blocked"}: 

99 payload = int(fields[0]) 

100 else: 

101 continue 

102 yield name, payload 

103 

104 

105def stat(): 

106 return dict(iter_stat()) 

107 

108 

109def iter_dev(): 

110 with DEV_PATH.open() as fobj: 

111 lines = fobj.readlines() 

112 # Skip the header lines (usually first 2 lines) 

113 for line in lines[2:]: 

114 fields = line.strip().split() 

115 if not fields: 

116 continue 

117 yield { 

118 "interface": fields[0].rstrip(":"), 

119 "receive": { 

120 "bytes": int(fields[1]), 

121 "packets": int(fields[2]), 

122 "errs": int(fields[3]), 

123 "drop": int(fields[4]), 

124 "fifo": int(fields[5]), 

125 "frame": int(fields[6]), 

126 "compressed": int(fields[7]), 

127 "multicast": int(fields[8]), 

128 }, 

129 "transmit": { 

130 "bytes": int(fields[9]), 

131 "packets": int(fields[10]), 

132 "errs": int(fields[11]), 

133 "drop": int(fields[12]), 

134 "fifo": int(fields[13]), 

135 "colls": int(fields[14]), 

136 "carrier": int(fields[15]), 

137 "compressed": int(fields[16]), 

138 }, 

139 } 

140 

141 

142def dev(): 

143 return tuple(iter_dev()) 

144 

145 

146def iter_wireless(): 

147 with WIRELESS_PATH.open() as fobj: 

148 lines = fobj.readlines() 

149 # Skip the header lines (usually first 2 lines) 

150 for line in lines[2:]: 

151 fields = line.strip().split() 

152 if not fields: 

153 continue 

154 yield { 

155 "interface": fields[0].rstrip(":"), 

156 "status": int(fields[1], 16), 

157 "quality": { 

158 "link": int(fields[2].rstrip(".")), 

159 "level": int(fields[3].rstrip(".")), 

160 "noise": int(fields[4].rstrip(".")), 

161 }, 

162 "discarded": { 

163 "nwid": int(fields[5]), 

164 "crypt": int(fields[6]), 

165 "misc": int(fields[7]), 

166 }, 

167 } 

168 

169 

170def wireless(): 

171 return tuple(iter_wireless()) 

172 

173 

174def iter_netstat(): 

175 return _iter_read_kv(NETSTAT_PATH) 

176 

177 

178def netstat(): 

179 return dict(iter_netstat()) 

180 

181 

182def iter_snmp(): 

183 return _iter_read_kv(SNMP_PATH) 

184 

185 

186def snmp(): 

187 return dict(iter_snmp())