Coverage for linuxpy/gpio/config.py: 77%
145 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:51 +0200
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:51 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2024 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
8"""
9line_config = {
10 "line": 5,
11 "direction": output | input,
12 "bias": "" | "pull-up" | "pull-down" | "disabled"
13 "clock": "" | "realtime" | "hte" # "" defaults to monotonic, HTE: hardware timestamp engine
15 "drive": "" | drain | source, # direction must be output, "" means push-pull
16 "debounce": "" | float # direction must be output; "" means don't set it;
18 "edge": "" | "rising" | "falling" | "both" # direction must be input
19}
21config = {
22 lines: 1 | [1,2,3],
24}
25"""
27import collections
28import functools
29from collections.abc import Mapping
31from linuxpy.types import Collection, Optional, Sequence, Union
32from linuxpy.util import index_mask, sentinel, sequence_indexes
34from .raw import MAX_ATTRS, LineAttrId, LineFlag, gpio_v2_line_config, gpio_v2_line_request
37def check_line_config(line_config: dict):
38 line = line_config["line"]
39 if not isinstance(line, int) or line < 0:
40 raise ValueError("line must be an int > 0")
41 direction = line_config.get("direction", "input").upper()
42 direction = LineFlag[direction]
43 bias = line_config.get("bias", sentinel)
44 clock = line_config.get("clock", sentinel)
45 active = line_config.get("active", sentinel)
47 drive = line_config.get("drive", sentinel)
48 debounce = line_config.get("debounce", sentinel)
50 edge = line_config.get("edge", sentinel)
52 if direction == LineFlag.INPUT:
53 if drive is not sentinel:
54 raise ValueError("Can only set drive on output lines")
55 if debounce is not sentinel: 55 ↛ 57line 55 didn't jump to line 57 because the condition on line 55 was always true
56 raise ValueError("Can only set debounce on output lines")
57 if edge is not sentinel:
58 if edge.lower() not in ("rising", "falling", "both"):
59 raise ValueError("edge must be 'rising', 'falling' or 'both'")
60 elif direction == LineFlag.OUTPUT: 60 ↛ 70line 60 didn't jump to line 70 because the condition on line 60 was always true
61 if edge is not sentinel: 61 ↛ 63line 61 didn't jump to line 63 because the condition on line 61 was always true
62 raise ValueError("Can only set edge on input lines")
63 if drive is not sentinel:
64 if drive.lower() not in ("push-pull", "drain", "source"):
65 raise ValueError("drive must be 'drain', 'source' or 'push-pull'")
66 if debounce is not sentinel:
67 if not isinstance(debounce, float) or debounce < 0:
68 raise ValueError("debounce must be a positive number")
69 else:
70 raise ValueError("direction can only be output or input")
72 if active is not sentinel:
73 if active.lower() not in ("high", "low"):
74 raise ValueError("active must be 'rising', 'high' or 'low'")
76 if bias is not sentinel:
77 if bias.lower() not in ("pull-up", "pull-down"):
78 raise ValueError("bias must be 'pull-up' or 'pull-down'")
80 if clock is not sentinel:
81 if clock.lower() not in ("monotonic", "realtime", "hte"):
82 raise ValueError("drive must be 'monotonic', 'realtime' or 'hte'")
85_CONFIG_DIRECTION_MAP: dict[Union[str, LineFlag], LineFlag] = {
86 "input": LineFlag.INPUT,
87 "output": LineFlag.OUTPUT,
88 "none": LineFlag.INPUT,
89 "": LineFlag.INPUT,
90 LineFlag.INPUT: LineFlag.INPUT,
91 LineFlag.OUTPUT: LineFlag.OUTPUT,
92}
95_CONFIG_EDGE_MAP: dict[Union[str, LineFlag], LineFlag] = {
96 "rising": LineFlag.EDGE_RISING,
97 "falling": LineFlag.EDGE_FALLING,
98 "both": LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING,
99 "": LineFlag(0),
100 "none": LineFlag(0),
101 LineFlag(0): LineFlag(0),
102 LineFlag.EDGE_RISING: LineFlag.EDGE_RISING,
103 LineFlag.EDGE_FALLING: LineFlag.EDGE_FALLING,
104 LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING: LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING,
105}
108_CONFIG_DRIVE_MAP: dict[Union[str, LineFlag], LineFlag] = {
109 "drain": LineFlag.OPEN_DRAIN,
110 "source": LineFlag.OPEN_SOURCE,
111 "push-pull": LineFlag(0),
112 "": LineFlag(0),
113 "none": LineFlag(0),
114 LineFlag.OPEN_DRAIN: LineFlag.OPEN_DRAIN,
115 LineFlag.OPEN_SOURCE: LineFlag.OPEN_SOURCE,
116 LineFlag(0): LineFlag(0),
117}
120_CONFIG_BIAS_MAP: dict[Union[str, LineFlag], LineFlag] = {
121 "pull-up": LineFlag.BIAS_PULL_UP,
122 "pull-down": LineFlag.BIAS_PULL_DOWN,
123 "": LineFlag(0),
124 "none": LineFlag(0),
125 LineFlag.BIAS_PULL_UP: LineFlag.BIAS_PULL_UP,
126 LineFlag.BIAS_PULL_DOWN: LineFlag.BIAS_PULL_DOWN,
127 LineFlag(0): LineFlag(0),
128}
131_CONFIG_CLOCK_MAP: dict[Union[str, LineFlag], LineFlag] = {
132 "realtime": LineFlag.EVENT_CLOCK_REALTIME,
133 "hte": LineFlag.EVENT_CLOCK_HTE,
134 "": LineFlag(0),
135 "monotonic": LineFlag(0),
136 LineFlag.EVENT_CLOCK_REALTIME: LineFlag.EVENT_CLOCK_REALTIME,
137 LineFlag.EVENT_CLOCK_HTE: LineFlag.EVENT_CLOCK_HTE,
138 LineFlag(0): LineFlag(0),
139}
142_CONFIG_ACTIVE: dict[Union[str, LineFlag], LineFlag] = {
143 "high": LineFlag(0),
144 "low": LineFlag.ACTIVE_LOW,
145 LineFlag.ACTIVE_LOW: LineFlag.ACTIVE_LOW,
146 "": LineFlag(0),
147 LineFlag(0): LineFlag(0),
148}
151def encode_line_config(line_config: dict) -> tuple[int, LineFlag, Union[int, None]]:
152 direction = line_config.get("direction", "input").lower()
153 bias = line_config.get("bias", "").lower()
154 clock = line_config.get("clock", "").lower()
155 drive = line_config.get("drive", "").lower()
156 edge = line_config.get("edge", "").lower()
157 active = line_config.get("active", "").lower()
158 flags = (
159 _CONFIG_DIRECTION_MAP[direction]
160 | _CONFIG_EDGE_MAP[edge]
161 | _CONFIG_DRIVE_MAP[drive]
162 | _CONFIG_BIAS_MAP[bias]
163 | _CONFIG_CLOCK_MAP[clock]
164 | _CONFIG_ACTIVE[active]
165 )
167 debounce = line_config.get("debounce", None)
168 debounce = None if debounce is None else int(debounce * 1_000_000)
170 return line_config["line"], flags, debounce
173def encode_config_lines(
174 lines: Sequence[tuple[int, LineFlag, Union[int, None]]], raw_config: Union[gpio_v2_line_config, None]
175) -> gpio_v2_line_config:
176 flags = collections.defaultdict(list)
177 debounces = collections.defaultdict(list)
178 for line, flag, debounce in lines:
179 flags[flag].append(line)
180 if debounce is not None:
181 debounces[debounce].append(line)
183 if len(flags) + len(debounces) > MAX_ATTRS: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 raise ValueError("Config exceeds maximum custom")
186 if raw_config is None:
187 raw_config = gpio_v2_line_config()
189 flags_lines = collections.Counter({flag: len(lines) for flag, lines in flags.items()})
190 general_flags, _ = flags_lines.most_common(1)[0]
191 flags.pop(general_flags)
192 raw_config.flags = general_flags
194 line_indexes = sequence_indexes(line[0] for line in lines)
195 for idx, (flag, flag_lines) in enumerate(flags.items()):
196 raw_config.attrs[idx].mask = index_mask(line_indexes, flag_lines)
197 raw_config.attrs[idx].attr.id = LineAttrId.FLAGS
198 raw_config.attrs[idx].attr.flags = flag
200 for idx, (debounce, debounce_lines) in enumerate(debounces.items(), len(flags)):
201 raw_config.attrs[idx].mask = index_mask(line_indexes, debounce_lines)
202 raw_config.attrs[idx].attr.id = LineAttrId.DEBOUNCE
203 raw_config.attrs[idx].attr.debounce_period_us = debounce
205 raw_config.num_attrs = len(flags) + len(debounces)
207 return raw_config
210def encode_config(config: list[dict], raw_config: Union[gpio_v2_line_config, None] = None) -> gpio_v2_line_config:
211 lines = [encode_line_config(line_config) for line_config in config]
212 return encode_config_lines(lines, raw_config)
215def encode_request(config: dict, raw_request: Union[gpio_v2_line_request, None] = None) -> gpio_v2_line_request:
216 if raw_request is None: 216 ↛ 218line 216 didn't jump to line 218 because the condition on line 216 was always true
217 raw_request = gpio_v2_line_request()
218 raw_request.consumer = config.get("name", "linuxpy").encode()
219 config_lines = config["lines"]
220 lines = [config_line["line"] for config_line in config_lines]
221 raw_request.num_lines = len(lines)
222 raw_request.offsets[: len(lines)] = lines
224 encode_config(config_lines, raw_request.config)
225 return raw_request
228def parse_config_line(line):
229 if isinstance(line, int):
230 return {"line": line}
231 return line
234def parse_config_lines(lines: Union[Collection, int]) -> list[dict]:
235 if isinstance(lines, int):
236 lines = [lines]
237 if isinstance(lines, dict):
238 return [{**cfg, "line": line} for line, cfg in lines.items()]
239 if isinstance(lines, (list, tuple)): 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true
240 return [parse_config_line(line) for line in lines]
241 raise TypeError("lines must be an int, a sequence of int or dict or dict")
244def parse_config(config: Optional[Union[Collection, int]], nb_lines: int) -> dict:
245 if config is None:
246 result = {"lines": tuple(range(nb_lines))}
247 elif isinstance(config, int):
248 result = {"lines": (config,)}
249 elif isinstance(config, Sequence):
250 result = {"lines": config}
251 elif isinstance(config, Mapping): 251 ↛ 254line 251 didn't jump to line 254 because the condition on line 251 was always true
252 result = config
253 else:
254 raise TypeError("config must be an int, list or dict")
255 result["lines"] = parse_config_lines(result["lines"])
256 result.setdefault("name", "linuxpy")
257 return result
260def CLine(nb, direction="", bias="", drive="", edge="", clock="", debounce=None):
261 result = {"line": nb}
262 if direction:
263 result["direction"] = direction
264 if bias:
265 result["bias"] = bias
266 if drive:
267 result["drive"] = drive
268 if edge:
269 result["edge"] = edge
270 if clock:
271 result["clock"] = clock
272 if debounce is not None:
273 result["debounce"] = debounce
274 return result
277CLineIn = functools.partial(CLine, direction="input")
278CLineOut = functools.partial(CLine, direction="output")
281def Config(lines, name="linuxpy"):
282 return {"name": name, "lines": parse_config_lines(lines)}