Coverage for linuxpy/gpio/config.py: 84%

145 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2025-05-27 13:54 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2024 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7 

8""" 

9line_config = { 

10 "line": 5, 

11 "direction": output | input, 

12 "bias": "" | "pull-up" | "pull-down" | "disabled" 

13 "clock": "" | "realtime" | "hte" # "" defaults to monotonic, HTE: hardware timestamp engine 

14 

15 "drive": "" | drain | source, # direction must be output, "" means push-pull 

16 "debounce": "" | float # direction must be output; "" means don't set it; 

17 

18 "edge": "" | "rising" | "falling" | "both" # direction must be input 

19} 

20 

21config = { 

22 lines: 1 | [1,2,3], 

23 

24} 

25""" 

26 

27import collections 

28import functools 

29from collections.abc import Mapping 

30 

31from linuxpy.types import Collection, Optional, Sequence, Union 

32from linuxpy.util import index_mask, sentinel, sequence_indexes 

33 

34from .raw import MAX_ATTRS, LineAttrId, LineFlag, gpio_v2_line_config, gpio_v2_line_request 

35 

36 

37def check_line_config(line_config: dict): 

38 line = line_config["line"] 

39 if not isinstance(line, int) or line < 0: 

40 raise ValueError("line must be an int > 0") 

41 direction = line_config.get("direction", "input").upper() 

42 direction = LineFlag[direction] 

43 bias = line_config.get("bias", sentinel) 

44 clock = line_config.get("clock", sentinel) 

45 active = line_config.get("active", sentinel) 

46 

47 drive = line_config.get("drive", sentinel) 

48 debounce = line_config.get("debounce", sentinel) 

49 

50 edge = line_config.get("edge", sentinel) 

51 

52 if direction == LineFlag.INPUT: 

53 if drive is not sentinel: 

54 raise ValueError("Can only set drive on output lines") 

55 if debounce is not sentinel: 

56 raise ValueError("Can only set debounce on output lines") 

57 if edge is not sentinel: 

58 if edge.lower() not in ("rising", "falling", "both"): 

59 raise ValueError("edge must be 'rising', 'falling' or 'both'") 

60 elif direction == LineFlag.OUTPUT: 

61 if edge is not sentinel: 

62 raise ValueError("Can only set edge on input lines") 

63 if drive is not sentinel: 

64 if drive.lower() not in ("push-pull", "drain", "source"): 

65 raise ValueError("drive must be 'drain', 'source' or 'push-pull'") 

66 if debounce is not sentinel: 

67 if not isinstance(debounce, float) or debounce < 0: 

68 raise ValueError("debounce must be a positive number") 

69 else: 

70 raise ValueError("direction can only be output or input") 

71 

72 if active is not sentinel: 

73 if active.lower() not in ("high", "low"): 

74 raise ValueError("active must be 'rising', 'high' or 'low'") 

75 

76 if bias is not sentinel: 

77 if bias.lower() not in ("pull-up", "pull-down"): 

78 raise ValueError("bias must be 'pull-up' or 'pull-down'") 

79 

80 if clock is not sentinel: 

81 if clock.lower() not in ("monotonic", "realtime", "hte"): 

82 raise ValueError("drive must be 'monotonic', 'realtime' or 'hte'") 

83 

84 

85_CONFIG_DIRECTION_MAP: dict[Union[str, LineFlag], LineFlag] = { 

86 "input": LineFlag.INPUT, 

87 "output": LineFlag.OUTPUT, 

88 "none": LineFlag.INPUT, 

89 "": LineFlag.INPUT, 

90 LineFlag.INPUT: LineFlag.INPUT, 

91 LineFlag.OUTPUT: LineFlag.OUTPUT, 

92} 

93 

94 

95_CONFIG_EDGE_MAP: dict[Union[str, LineFlag], LineFlag] = { 

96 "rising": LineFlag.EDGE_RISING, 

97 "falling": LineFlag.EDGE_FALLING, 

98 "both": LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING, 

99 "": LineFlag(0), 

100 "none": LineFlag(0), 

101 LineFlag(0): LineFlag(0), 

102 LineFlag.EDGE_RISING: LineFlag.EDGE_RISING, 

103 LineFlag.EDGE_FALLING: LineFlag.EDGE_FALLING, 

104 LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING: LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING, 

105} 

106 

107 

108_CONFIG_DRIVE_MAP: dict[Union[str, LineFlag], LineFlag] = { 

109 "drain": LineFlag.OPEN_DRAIN, 

110 "source": LineFlag.OPEN_SOURCE, 

111 "push-pull": LineFlag(0), 

112 "": LineFlag(0), 

113 "none": LineFlag(0), 

114 LineFlag.OPEN_DRAIN: LineFlag.OPEN_DRAIN, 

115 LineFlag.OPEN_SOURCE: LineFlag.OPEN_SOURCE, 

116 LineFlag(0): LineFlag(0), 

117} 

118 

119 

120_CONFIG_BIAS_MAP: dict[Union[str, LineFlag], LineFlag] = { 

121 "pull-up": LineFlag.BIAS_PULL_UP, 

122 "pull-down": LineFlag.BIAS_PULL_DOWN, 

123 "": LineFlag(0), 

124 "none": LineFlag(0), 

125 LineFlag.BIAS_PULL_UP: LineFlag.BIAS_PULL_UP, 

126 LineFlag.BIAS_PULL_DOWN: LineFlag.BIAS_PULL_DOWN, 

127 LineFlag(0): LineFlag(0), 

128} 

129 

130 

131_CONFIG_CLOCK_MAP: dict[Union[str, LineFlag], LineFlag] = { 

132 "realtime": LineFlag.EVENT_CLOCK_REALTIME, 

133 "hte": LineFlag.EVENT_CLOCK_HTE, 

134 "": LineFlag(0), 

135 "monotonic": LineFlag(0), 

136 LineFlag.EVENT_CLOCK_REALTIME: LineFlag.EVENT_CLOCK_REALTIME, 

137 LineFlag.EVENT_CLOCK_HTE: LineFlag.EVENT_CLOCK_HTE, 

138 LineFlag(0): LineFlag(0), 

139} 

140 

141 

142_CONFIG_ACTIVE: dict[Union[str, LineFlag], LineFlag] = { 

143 "high": LineFlag(0), 

144 "low": LineFlag.ACTIVE_LOW, 

145 LineFlag.ACTIVE_LOW: LineFlag.ACTIVE_LOW, 

146 "": LineFlag(0), 

147 LineFlag(0): LineFlag(0), 

148} 

149 

150 

151def encode_line_config(line_config: dict) -> tuple[int, LineFlag, Union[int, None]]: 

152 direction = line_config.get("direction", "input").lower() 

153 bias = line_config.get("bias", "").lower() 

154 clock = line_config.get("clock", "").lower() 

155 drive = line_config.get("drive", "").lower() 

156 edge = line_config.get("edge", "").lower() 

157 active = line_config.get("active", "").lower() 

158 flags = ( 

159 _CONFIG_DIRECTION_MAP[direction] 

160 | _CONFIG_EDGE_MAP[edge] 

161 | _CONFIG_DRIVE_MAP[drive] 

162 | _CONFIG_BIAS_MAP[bias] 

163 | _CONFIG_CLOCK_MAP[clock] 

164 | _CONFIG_ACTIVE[active] 

165 ) 

166 

167 debounce = line_config.get("debounce", None) 

168 debounce = None if debounce is None else int(debounce * 1_000_000) 

169 

170 return line_config["line"], flags, debounce 

171 

172 

173def encode_config_lines( 

174 lines: Sequence[tuple[int, LineFlag, Union[int, None]]], raw_config: Union[gpio_v2_line_config, None] 

175) -> gpio_v2_line_config: 

176 flags = collections.defaultdict(list) 

177 debounces = collections.defaultdict(list) 

178 for line, flag, debounce in lines: 

179 flags[flag].append(line) 

180 if debounce is not None: 

181 debounces[debounce].append(line) 

182 

183 if len(flags) + len(debounces) > MAX_ATTRS: 

184 raise ValueError("Config exceeds maximum custom") 

185 

186 if raw_config is None: 

187 raw_config = gpio_v2_line_config() 

188 

189 flags_lines = collections.Counter({flag: len(lines) for flag, lines in flags.items()}) 

190 general_flags, _ = flags_lines.most_common(1)[0] 

191 flags.pop(general_flags) 

192 raw_config.flags = general_flags 

193 

194 line_indexes = sequence_indexes(line[0] for line in lines) 

195 for idx, (flag, flag_lines) in enumerate(flags.items()): 

196 raw_config.attrs[idx].mask = index_mask(line_indexes, flag_lines) 

197 raw_config.attrs[idx].attr.id = LineAttrId.FLAGS 

198 raw_config.attrs[idx].attr.flags = flag 

199 

200 for idx, (debounce, debounce_lines) in enumerate(debounces.items(), len(flags)): 

201 raw_config.attrs[idx].mask = index_mask(line_indexes, debounce_lines) 

202 raw_config.attrs[idx].attr.id = LineAttrId.DEBOUNCE 

203 raw_config.attrs[idx].attr.debounce_period_us = debounce 

204 

205 raw_config.num_attrs = len(flags) + len(debounces) 

206 

207 return raw_config 

208 

209 

210def encode_config(config: list[dict], raw_config: Union[gpio_v2_line_config, None] = None) -> gpio_v2_line_config: 

211 lines = [encode_line_config(line_config) for line_config in config] 

212 return encode_config_lines(lines, raw_config) 

213 

214 

215def encode_request(config: dict, raw_request: Union[gpio_v2_line_request, None] = None) -> gpio_v2_line_request: 

216 if raw_request is None: 

217 raw_request = gpio_v2_line_request() 

218 raw_request.consumer = config.get("name", "linuxpy").encode() 

219 config_lines = config["lines"] 

220 lines = [config_line["line"] for config_line in config_lines] 

221 raw_request.num_lines = len(lines) 

222 raw_request.offsets[: len(lines)] = lines 

223 

224 encode_config(config_lines, raw_request.config) 

225 return raw_request 

226 

227 

228def parse_config_line(line): 

229 if isinstance(line, int): 

230 return {"line": line} 

231 return line 

232 

233 

234def parse_config_lines(lines: Union[Collection, int]) -> list[dict]: 

235 if isinstance(lines, int): 

236 lines = [lines] 

237 if isinstance(lines, dict): 

238 return [{**cfg, "line": line} for line, cfg in lines.items()] 

239 if isinstance(lines, (list, tuple)): 

240 return [parse_config_line(line) for line in lines] 

241 raise TypeError("lines must be an int, a sequence of int or dict or dict") 

242 

243 

244def parse_config(config: Optional[Union[Collection, int]], nb_lines: int) -> dict: 

245 if config is None: 

246 result = {"lines": tuple(range(nb_lines))} 

247 elif isinstance(config, int): 

248 result = {"lines": (config,)} 

249 elif isinstance(config, Sequence): 

250 result = {"lines": config} 

251 elif isinstance(config, Mapping): 

252 result = config 

253 else: 

254 raise TypeError("config must be an int, list or dict") 

255 result["lines"] = parse_config_lines(result["lines"]) 

256 result.setdefault("name", "linuxpy") 

257 return result 

258 

259 

260def CLine(nb, direction="", bias="", drive="", edge="", clock="", debounce=None): 

261 result = {"line": nb} 

262 if direction: 

263 result["direction"] = direction 

264 if bias: 

265 result["bias"] = bias 

266 if drive: 

267 result["drive"] = drive 

268 if edge: 

269 result["edge"] = edge 

270 if clock: 

271 result["clock"] = clock 

272 if debounce is not None: 

273 result["debounce"] = debounce 

274 return result 

275 

276 

277CLineIn = functools.partial(CLine, direction="input") 

278CLineOut = functools.partial(CLine, direction="output") 

279 

280 

281def Config(lines, name="linuxpy"): 

282 return {"name": name, "lines": parse_config_lines(lines)}