Coverage for linuxpy/gpio/config.py: 84%
145 statements
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2024 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
8"""
9line_config = {
10 "line": 5,
11 "direction": output | input,
12 "bias": "" | "pull-up" | "pull-down" | "disabled"
13 "clock": "" | "realtime" | "hte" # "" defaults to monotonic, HTE: hardware timestamp engine
15 "drive": "" | drain | source, # direction must be output, "" means push-pull
16 "debounce": "" | float # direction must be output; "" means don't set it;
18 "edge": "" | "rising" | "falling" | "both" # direction must be input
19}
21config = {
22 lines: 1 | [1,2,3],
24}
25"""
27import collections
28import functools
29from collections.abc import Mapping
31from linuxpy.types import Collection, Optional, Sequence, Union
32from linuxpy.util import index_mask, sentinel, sequence_indexes
34from .raw import MAX_ATTRS, LineAttrId, LineFlag, gpio_v2_line_config, gpio_v2_line_request
37def check_line_config(line_config: dict):
38 line = line_config["line"]
39 if not isinstance(line, int) or line < 0:
40 raise ValueError("line must be an int > 0")
41 direction = line_config.get("direction", "input").upper()
42 direction = LineFlag[direction]
43 bias = line_config.get("bias", sentinel)
44 clock = line_config.get("clock", sentinel)
45 active = line_config.get("active", sentinel)
47 drive = line_config.get("drive", sentinel)
48 debounce = line_config.get("debounce", sentinel)
50 edge = line_config.get("edge", sentinel)
52 if direction == LineFlag.INPUT:
53 if drive is not sentinel:
54 raise ValueError("Can only set drive on output lines")
55 if debounce is not sentinel:
56 raise ValueError("Can only set debounce on output lines")
57 if edge is not sentinel:
58 if edge.lower() not in ("rising", "falling", "both"):
59 raise ValueError("edge must be 'rising', 'falling' or 'both'")
60 elif direction == LineFlag.OUTPUT:
61 if edge is not sentinel:
62 raise ValueError("Can only set edge on input lines")
63 if drive is not sentinel:
64 if drive.lower() not in ("push-pull", "drain", "source"):
65 raise ValueError("drive must be 'drain', 'source' or 'push-pull'")
66 if debounce is not sentinel:
67 if not isinstance(debounce, float) or debounce < 0:
68 raise ValueError("debounce must be a positive number")
69 else:
70 raise ValueError("direction can only be output or input")
72 if active is not sentinel:
73 if active.lower() not in ("high", "low"):
74 raise ValueError("active must be 'rising', 'high' or 'low'")
76 if bias is not sentinel:
77 if bias.lower() not in ("pull-up", "pull-down"):
78 raise ValueError("bias must be 'pull-up' or 'pull-down'")
80 if clock is not sentinel:
81 if clock.lower() not in ("monotonic", "realtime", "hte"):
82 raise ValueError("drive must be 'monotonic', 'realtime' or 'hte'")
85_CONFIG_DIRECTION_MAP: dict[Union[str, LineFlag], LineFlag] = {
86 "input": LineFlag.INPUT,
87 "output": LineFlag.OUTPUT,
88 "none": LineFlag.INPUT,
89 "": LineFlag.INPUT,
90 LineFlag.INPUT: LineFlag.INPUT,
91 LineFlag.OUTPUT: LineFlag.OUTPUT,
92}
95_CONFIG_EDGE_MAP: dict[Union[str, LineFlag], LineFlag] = {
96 "rising": LineFlag.EDGE_RISING,
97 "falling": LineFlag.EDGE_FALLING,
98 "both": LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING,
99 "": LineFlag(0),
100 "none": LineFlag(0),
101 LineFlag(0): LineFlag(0),
102 LineFlag.EDGE_RISING: LineFlag.EDGE_RISING,
103 LineFlag.EDGE_FALLING: LineFlag.EDGE_FALLING,
104 LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING: LineFlag.EDGE_RISING | LineFlag.EDGE_FALLING,
105}
108_CONFIG_DRIVE_MAP: dict[Union[str, LineFlag], LineFlag] = {
109 "drain": LineFlag.OPEN_DRAIN,
110 "source": LineFlag.OPEN_SOURCE,
111 "push-pull": LineFlag(0),
112 "": LineFlag(0),
113 "none": LineFlag(0),
114 LineFlag.OPEN_DRAIN: LineFlag.OPEN_DRAIN,
115 LineFlag.OPEN_SOURCE: LineFlag.OPEN_SOURCE,
116 LineFlag(0): LineFlag(0),
117}
120_CONFIG_BIAS_MAP: dict[Union[str, LineFlag], LineFlag] = {
121 "pull-up": LineFlag.BIAS_PULL_UP,
122 "pull-down": LineFlag.BIAS_PULL_DOWN,
123 "": LineFlag(0),
124 "none": LineFlag(0),
125 LineFlag.BIAS_PULL_UP: LineFlag.BIAS_PULL_UP,
126 LineFlag.BIAS_PULL_DOWN: LineFlag.BIAS_PULL_DOWN,
127 LineFlag(0): LineFlag(0),
128}
131_CONFIG_CLOCK_MAP: dict[Union[str, LineFlag], LineFlag] = {
132 "realtime": LineFlag.EVENT_CLOCK_REALTIME,
133 "hte": LineFlag.EVENT_CLOCK_HTE,
134 "": LineFlag(0),
135 "monotonic": LineFlag(0),
136 LineFlag.EVENT_CLOCK_REALTIME: LineFlag.EVENT_CLOCK_REALTIME,
137 LineFlag.EVENT_CLOCK_HTE: LineFlag.EVENT_CLOCK_HTE,
138 LineFlag(0): LineFlag(0),
139}
142_CONFIG_ACTIVE: dict[Union[str, LineFlag], LineFlag] = {
143 "high": LineFlag(0),
144 "low": LineFlag.ACTIVE_LOW,
145 LineFlag.ACTIVE_LOW: LineFlag.ACTIVE_LOW,
146 "": LineFlag(0),
147 LineFlag(0): LineFlag(0),
148}
151def encode_line_config(line_config: dict) -> tuple[int, LineFlag, Union[int, None]]:
152 direction = line_config.get("direction", "input").lower()
153 bias = line_config.get("bias", "").lower()
154 clock = line_config.get("clock", "").lower()
155 drive = line_config.get("drive", "").lower()
156 edge = line_config.get("edge", "").lower()
157 active = line_config.get("active", "").lower()
158 flags = (
159 _CONFIG_DIRECTION_MAP[direction]
160 | _CONFIG_EDGE_MAP[edge]
161 | _CONFIG_DRIVE_MAP[drive]
162 | _CONFIG_BIAS_MAP[bias]
163 | _CONFIG_CLOCK_MAP[clock]
164 | _CONFIG_ACTIVE[active]
165 )
167 debounce = line_config.get("debounce", None)
168 debounce = None if debounce is None else int(debounce * 1_000_000)
170 return line_config["line"], flags, debounce
173def encode_config_lines(
174 lines: Sequence[tuple[int, LineFlag, Union[int, None]]], raw_config: Union[gpio_v2_line_config, None]
175) -> gpio_v2_line_config:
176 flags = collections.defaultdict(list)
177 debounces = collections.defaultdict(list)
178 for line, flag, debounce in lines:
179 flags[flag].append(line)
180 if debounce is not None:
181 debounces[debounce].append(line)
183 if len(flags) + len(debounces) > MAX_ATTRS:
184 raise ValueError("Config exceeds maximum custom")
186 if raw_config is None:
187 raw_config = gpio_v2_line_config()
189 flags_lines = collections.Counter({flag: len(lines) for flag, lines in flags.items()})
190 general_flags, _ = flags_lines.most_common(1)[0]
191 flags.pop(general_flags)
192 raw_config.flags = general_flags
194 line_indexes = sequence_indexes(line[0] for line in lines)
195 for idx, (flag, flag_lines) in enumerate(flags.items()):
196 raw_config.attrs[idx].mask = index_mask(line_indexes, flag_lines)
197 raw_config.attrs[idx].attr.id = LineAttrId.FLAGS
198 raw_config.attrs[idx].attr.flags = flag
200 for idx, (debounce, debounce_lines) in enumerate(debounces.items(), len(flags)):
201 raw_config.attrs[idx].mask = index_mask(line_indexes, debounce_lines)
202 raw_config.attrs[idx].attr.id = LineAttrId.DEBOUNCE
203 raw_config.attrs[idx].attr.debounce_period_us = debounce
205 raw_config.num_attrs = len(flags) + len(debounces)
207 return raw_config
210def encode_config(config: list[dict], raw_config: Union[gpio_v2_line_config, None] = None) -> gpio_v2_line_config:
211 lines = [encode_line_config(line_config) for line_config in config]
212 return encode_config_lines(lines, raw_config)
215def encode_request(config: dict, raw_request: Union[gpio_v2_line_request, None] = None) -> gpio_v2_line_request:
216 if raw_request is None:
217 raw_request = gpio_v2_line_request()
218 raw_request.consumer = config.get("name", "linuxpy").encode()
219 config_lines = config["lines"]
220 lines = [config_line["line"] for config_line in config_lines]
221 raw_request.num_lines = len(lines)
222 raw_request.offsets[: len(lines)] = lines
224 encode_config(config_lines, raw_request.config)
225 return raw_request
228def parse_config_line(line):
229 if isinstance(line, int):
230 return {"line": line}
231 return line
234def parse_config_lines(lines: Union[Collection, int]) -> list[dict]:
235 if isinstance(lines, int):
236 lines = [lines]
237 if isinstance(lines, dict):
238 return [{**cfg, "line": line} for line, cfg in lines.items()]
239 if isinstance(lines, (list, tuple)):
240 return [parse_config_line(line) for line in lines]
241 raise TypeError("lines must be an int, a sequence of int or dict or dict")
244def parse_config(config: Optional[Union[Collection, int]], nb_lines: int) -> dict:
245 if config is None:
246 result = {"lines": tuple(range(nb_lines))}
247 elif isinstance(config, int):
248 result = {"lines": (config,)}
249 elif isinstance(config, Sequence):
250 result = {"lines": config}
251 elif isinstance(config, Mapping):
252 result = config
253 else:
254 raise TypeError("config must be an int, list or dict")
255 result["lines"] = parse_config_lines(result["lines"])
256 result.setdefault("name", "linuxpy")
257 return result
260def CLine(nb, direction="", bias="", drive="", edge="", clock="", debounce=None):
261 result = {"line": nb}
262 if direction:
263 result["direction"] = direction
264 if bias:
265 result["bias"] = bias
266 if drive:
267 result["drive"] = drive
268 if edge:
269 result["edge"] = edge
270 if clock:
271 result["clock"] = clock
272 if debounce is not None:
273 result["debounce"] = debounce
274 return result
277CLineIn = functools.partial(CLine, direction="input")
278CLineOut = functools.partial(CLine, direction="output")
281def Config(lines, name="linuxpy"):
282 return {"name": name, "lines": parse_config_lines(lines)}