Coverage for linuxpy/gpio/config.py: 77%

145 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-21 07:51 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2024 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7 

8""" 

9line_config = { 

10 "line": 5, 

11 "direction": output | input, 

12 "bias": "" | "pull-up" | "pull-down" | "disabled" 

13 "clock": "" | "realtime" | "hte" # "" defaults to monotonic, HTE: hardware timestamp engine 

14 

15 "drive": "" | drain | source, # direction must be output, "" means push-pull 

16 "debounce": "" | float # direction must be output; "" means don't set it; 

17 

18 "edge": "" | "rising" | "falling" | "both" # direction must be input 

19} 

20 

21config = { 

22 lines: 1 | [1,2,3], 

23 

24} 

25""" 

26 

27import collections 

28import functools 

29from collections.abc import Mapping 

30 

31from linuxpy.types import Collection, Optional, Sequence, Union 

32from linuxpy.util import index_mask, sentinel, sequence_indexes 

33 

34from .raw import MAX_ATTRS, LineAttrId, LineFlag, gpio_v2_line_config, gpio_v2_line_request 

35 

36 

37def check_line_config(line_config: dict): 

38 line = line_config["line"] 

39 if not isinstance(line, int) or line < 0: 

40 raise ValueError("line must be an int > 0") 

41 direction = line_config.get("direction", "input").upper() 

42 direction = LineFlag[direction] 

43 bias = line_config.get("bias", sentinel) 

44 clock = line_config.get("clock", sentinel) 

45 active = line_config.get("active", sentinel) 

46 

47 drive = line_config.get("drive", sentinel) 

48 debounce = line_config.get("debounce", sentinel) 

49 

50 edge = line_config.get("edge", sentinel) 

51 

52 if direction == LineFlag.INPUT: 

53 if drive is not sentinel: 

54 raise ValueError("Can only set drive on output lines") 

55 if debounce is not sentinel: 55 ↛ 57line 55 didn't jump to line 57 because the condition on line 55 was always true

56 raise ValueError("Can only set debounce on output lines") 

57 if edge is not sentinel: 

58 if edge.lower() not in ("rising", "falling", "both"): 

59 raise ValueError("edge must be 'rising', 'falling' or 'both'") 

60 elif direction == LineFlag.OUTPUT: 60 ↛ 70line 60 didn't jump to line 70 because the condition on line 60 was always true

61 if edge is not sentinel: 61 ↛ 63line 61 didn't jump to line 63 because the condition on line 61 was always true

62 raise ValueError("Can only set edge on input lines") 

63 if drive is not sentinel: 

64 if drive.lower() not in ("push-pull", "drain", "source"): 

65 raise ValueError("drive must be 'drain', 'source' or 'push-pull'") 

66 if debounce is not sentinel: 

67 if not isinstance(debounce, float) or debounce < 0: 

68 raise ValueError("debounce must be a positive number") 

69 else: 

70 raise ValueError("direction can only be output or input") 

71 

72 if active is not sentinel: 

73 if active.lower() not in ("high", "low"): 

74 raise ValueError("active must be 'rising', 'high' or 'low'") 

75 

76 if bias is not sentinel: 

77 if bias.lower() not in ("pull-up", "pull-down"): 

78 raise ValueError("bias must be 'pull-up' or 'pull-down'") 

79 

80 if clock is not sentinel: 

81 if clock.lower() not in ("monotonic", "realtime", "hte"): 

82 raise ValueError("drive must be 'monotonic', 'realtime' or 'hte'") 

83 

84 

85_CONFIG_DIRECTION_MAP: dict[Union[str, LineFlag], LineFlag] = { 

86 "input": LineFlag.INPUT, 

87 "output": LineFlag.OUTPUT, 

88 "none": LineFlag.INPUT, 

89 "": LineFlag.INPUT, 

90 LineFlag.INPUT: LineFlag.INPUT, 

91 LineFlag.OUTPUT: LineFlag.OUTPUT, 

92} 

93 

94 

95_CONFIG_EDGE_MAP: dict[Union[str, LineFlag], LineFlag] = { 

96 "rising": LineFlag.EDGE_RISING, 

97 "falling": LineFlag.EDGE_FALLING, 

98 "both": LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING, 

99 "": LineFlag(0), 

100 "none": LineFlag(0), 

101 LineFlag(0): LineFlag(0), 

102 LineFlag.EDGE_RISING: LineFlag.EDGE_RISING, 

103 LineFlag.EDGE_FALLING: LineFlag.EDGE_FALLING, 

104 LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING: LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING, 

105} 

106 

107 

108_CONFIG_DRIVE_MAP: dict[Union[str, LineFlag], LineFlag] = { 

109 "drain": LineFlag.OPEN_DRAIN, 

110 "source": LineFlag.OPEN_SOURCE, 

111 "push-pull": LineFlag(0), 

112 "": LineFlag(0), 

113 "none": LineFlag(0), 

114 LineFlag.OPEN_DRAIN: LineFlag.OPEN_DRAIN, 

115 LineFlag.OPEN_SOURCE: LineFlag.OPEN_SOURCE, 

116 LineFlag(0): LineFlag(0), 

117} 

118 

119 

120_CONFIG_BIAS_MAP: dict[Union[str, LineFlag], LineFlag] = { 

121 "pull-up": LineFlag.BIAS_PULL_UP, 

122 "pull-down": LineFlag.BIAS_PULL_DOWN, 

123 "": LineFlag(0), 

124 "none": LineFlag(0), 

125 LineFlag.BIAS_PULL_UP: LineFlag.BIAS_PULL_UP, 

126 LineFlag.BIAS_PULL_DOWN: LineFlag.BIAS_PULL_DOWN, 

127 LineFlag(0): LineFlag(0), 

128} 

129 

130 

131_CONFIG_CLOCK_MAP: dict[Union[str, LineFlag], LineFlag] = { 

132 "realtime": LineFlag.EVENT_CLOCK_REALTIME, 

133 "hte": LineFlag.EVENT_CLOCK_HTE, 

134 "": LineFlag(0), 

135 "monotonic": LineFlag(0), 

136 LineFlag.EVENT_CLOCK_REALTIME: LineFlag.EVENT_CLOCK_REALTIME, 

137 LineFlag.EVENT_CLOCK_HTE: LineFlag.EVENT_CLOCK_HTE, 

138 LineFlag(0): LineFlag(0), 

139} 

140 

141 

142_CONFIG_ACTIVE: dict[Union[str, LineFlag], LineFlag] = { 

143 "high": LineFlag(0), 

144 "low": LineFlag.ACTIVE_LOW, 

145 LineFlag.ACTIVE_LOW: LineFlag.ACTIVE_LOW, 

146 "": LineFlag(0), 

147 LineFlag(0): LineFlag(0), 

148} 

149 

150 

151def encode_line_config(line_config: dict) -> tuple[int, LineFlag, Union[int, None]]: 

152 direction = line_config.get("direction", "input").lower() 

153 bias = line_config.get("bias", "").lower() 

154 clock = line_config.get("clock", "").lower() 

155 drive = line_config.get("drive", "").lower() 

156 edge = line_config.get("edge", "").lower() 

157 active = line_config.get("active", "").lower() 

158 flags = ( 

159 _CONFIG_DIRECTION_MAP[direction] 

160 | _CONFIG_EDGE_MAP[edge] 

161 | _CONFIG_DRIVE_MAP[drive] 

162 | _CONFIG_BIAS_MAP[bias] 

163 | _CONFIG_CLOCK_MAP[clock] 

164 | _CONFIG_ACTIVE[active] 

165 ) 

166 

167 debounce = line_config.get("debounce", None) 

168 debounce = None if debounce is None else int(debounce * 1_000_000) 

169 

170 return line_config["line"], flags, debounce 

171 

172 

173def encode_config_lines( 

174 lines: Sequence[tuple[int, LineFlag, Union[int, None]]], raw_config: Union[gpio_v2_line_config, None] 

175) -> gpio_v2_line_config: 

176 flags = collections.defaultdict(list) 

177 debounces = collections.defaultdict(list) 

178 for line, flag, debounce in lines: 

179 flags[flag].append(line) 

180 if debounce is not None: 

181 debounces[debounce].append(line) 

182 

183 if len(flags) + len(debounces) > MAX_ATTRS: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 raise ValueError("Config exceeds maximum custom") 

185 

186 if raw_config is None: 

187 raw_config = gpio_v2_line_config() 

188 

189 flags_lines = collections.Counter({flag: len(lines) for flag, lines in flags.items()}) 

190 general_flags, _ = flags_lines.most_common(1)[0] 

191 flags.pop(general_flags) 

192 raw_config.flags = general_flags 

193 

194 line_indexes = sequence_indexes(line[0] for line in lines) 

195 for idx, (flag, flag_lines) in enumerate(flags.items()): 

196 raw_config.attrs[idx].mask = index_mask(line_indexes, flag_lines) 

197 raw_config.attrs[idx].attr.id = LineAttrId.FLAGS 

198 raw_config.attrs[idx].attr.flags = flag 

199 

200 for idx, (debounce, debounce_lines) in enumerate(debounces.items(), len(flags)): 

201 raw_config.attrs[idx].mask = index_mask(line_indexes, debounce_lines) 

202 raw_config.attrs[idx].attr.id = LineAttrId.DEBOUNCE 

203 raw_config.attrs[idx].attr.debounce_period_us = debounce 

204 

205 raw_config.num_attrs = len(flags) + len(debounces) 

206 

207 return raw_config 

208 

209 

210def encode_config(config: list[dict], raw_config: Union[gpio_v2_line_config, None] = None) -> gpio_v2_line_config: 

211 lines = [encode_line_config(line_config) for line_config in config] 

212 return encode_config_lines(lines, raw_config) 

213 

214 

215def encode_request(config: dict, raw_request: Union[gpio_v2_line_request, None] = None) -> gpio_v2_line_request: 

216 if raw_request is None: 216 ↛ 218line 216 didn't jump to line 218 because the condition on line 216 was always true

217 raw_request = gpio_v2_line_request() 

218 raw_request.consumer = config.get("name", "linuxpy").encode() 

219 config_lines = config["lines"] 

220 lines = [config_line["line"] for config_line in config_lines] 

221 raw_request.num_lines = len(lines) 

222 raw_request.offsets[: len(lines)] = lines 

223 

224 encode_config(config_lines, raw_request.config) 

225 return raw_request 

226 

227 

228def parse_config_line(line): 

229 if isinstance(line, int): 

230 return {"line": line} 

231 return line 

232 

233 

234def parse_config_lines(lines: Union[Collection, int]) -> list[dict]: 

235 if isinstance(lines, int): 

236 lines = [lines] 

237 if isinstance(lines, dict): 

238 return [{**cfg, "line": line} for line, cfg in lines.items()] 

239 if isinstance(lines, (list, tuple)): 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true

240 return [parse_config_line(line) for line in lines] 

241 raise TypeError("lines must be an int, a sequence of int or dict or dict") 

242 

243 

244def parse_config(config: Optional[Union[Collection, int]], nb_lines: int) -> dict: 

245 if config is None: 

246 result = {"lines": tuple(range(nb_lines))} 

247 elif isinstance(config, int): 

248 result = {"lines": (config,)} 

249 elif isinstance(config, Sequence): 

250 result = {"lines": config} 

251 elif isinstance(config, Mapping): 251 ↛ 254line 251 didn't jump to line 254 because the condition on line 251 was always true

252 result = config 

253 else: 

254 raise TypeError("config must be an int, list or dict") 

255 result["lines"] = parse_config_lines(result["lines"]) 

256 result.setdefault("name", "linuxpy") 

257 return result 

258 

259 

260def CLine(nb, direction="", bias="", drive="", edge="", clock="", debounce=None): 

261 result = {"line": nb} 

262 if direction: 

263 result["direction"] = direction 

264 if bias: 

265 result["bias"] = bias 

266 if drive: 

267 result["drive"] = drive 

268 if edge: 

269 result["edge"] = edge 

270 if clock: 

271 result["clock"] = clock 

272 if debounce is not None: 

273 result["debounce"] = debounce 

274 return result 

275 

276 

277CLineIn = functools.partial(CLine, direction="input") 

278CLineOut = functools.partial(CLine, direction="output") 

279 

280 

281def Config(lines, name="linuxpy"): 

282 return {"name": name, "lines": parse_config_lines(lines)}