Coverage for linuxpy/gpio/device.py: 99%
266 statements
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2024 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""
8Human friendly interface to linux GPIO subsystem.
10The heart of linuxpy GPIO library is the [`Device`][linuxpy.gpio.device.Device]
11class.
12The recommended way is to use one of the find methods to create a Device object
13and use it within a context manager like:
15```python
16from linuxpy.gpio.device import find, LineFlag
18with find() as gpio:
19 # request lines 5 and 6
20 lines = gpio[5, 6]
21 lines.flags = LineFlag.ACTIVE_LOW
22 with lines:
23 print(lines[:])
25```
26"""
28import collections
29import contextlib
30import fcntl
31import functools
32import os
33import pathlib
35from linuxpy.ctypes import sizeof, u32
36from linuxpy.device import BaseDevice, ReentrantOpen, iter_device_files
37from linuxpy.ioctl import ioctl
38from linuxpy.types import AsyncIterator, Collection, FDLike, Iterable, Optional, PathLike, Sequence, Union
39from linuxpy.util import (
40 async_event_stream as async_events,
41 bit_indexes,
42 chunks,
43 event_stream as events,
44 index_mask,
45 make_find,
46 sentinel,
47 sequence_indexes,
48 to_fd,
49)
51from . import raw
52from .config import encode_request, parse_config
53from .raw import IOC
55Info = collections.namedtuple("Info", "name label lines")
56ChipInfo = collections.namedtuple("ChipInfo", "name label lines")
57LineInfo = collections.namedtuple("LineInfo", "name consumer line flags attributes")
59LineFlag = raw.LineFlag
60LineAttrId = raw.LineAttrId
61LineEventId = raw.LineEventId
62LineChangedType = raw.LineChangedType
63LineEvent = collections.namedtuple("LineEvent", "timestamp type line sequence line_sequence")
64LineInfoEvent = collections.namedtuple("LineInfoEvent", "name consumer line flags attributes timestamp type")
67class LineAttributes:
68 def __init__(self):
69 self.flags = LineFlag(0)
70 self.indexes = []
71 self.debounce_period = None
74def decode_line_info(info: raw.gpio_v2_line_info) -> LineInfo:
75 attributes = LineAttributes()
76 for i in range(info.num_attrs):
77 attr = info.attrs[i]
78 if attr.id == LineAttrId.FLAGS:
79 attributes.flags = LineFlag(attr.flags)
80 elif attr.id == LineAttrId.OUTPUT_VALUES:
81 attributes.indexes = bit_indexes(attr.values)
82 elif attr.id == LineAttrId.DEBOUNCE:
83 attributes.debounce_period = attr.debounce_period_us / 1_000_000
85 return LineInfo(
86 info.name.decode(),
87 info.consumer.decode(),
88 info.offset,
89 LineFlag(info.flags),
90 attributes,
91 )
94def get_chip_info(fd: FDLike) -> ChipInfo:
95 """Reads the chip information
97 Args:
98 fd (FDLike): a gpiochip file number or file like object
100 Returns:
101 ChipInfo: chip info of the given file descriptor
102 """
103 info = raw.gpiochip_info()
104 ioctl(fd, IOC.CHIPINFO, info)
105 return ChipInfo(info.name.decode(), info.label.decode(), info.lines)
108def get_line_info(fd: FDLike, line: int) -> LineInfo:
109 """Reads the given line information
111 Args:
112 fd (FDLike): a gpiochip file number or file like object
113 line (int): desired line to get information
115 Returns:
116 LineInfo: information for the given line and chip
117 """
118 info = raw.gpio_v2_line_info(offset=line)
119 ioctl(fd, IOC.GET_LINEINFO, info)
120 return decode_line_info(info)
123def get_info(fd: FDLike) -> Info:
124 """Reads the given chip the full information
126 Args:
127 fd (FDLike): a gpiochip file number or file like object
129 Returns:
130 Info: information for the given chip including all its lines
131 """
132 chip = get_chip_info(fd)
133 return Info(chip.name, chip.label, [get_line_info(fd, line) for line in range(chip.lines)])
136def raw_request(fd: FDLike, request: raw.gpio_v2_line_request, blocking=False):
137 ioctl(fd, IOC.GET_LINE, request)
138 if not blocking:
139 flag = fcntl.fcntl(request.fd, fcntl.F_GETFL)
140 fcntl.fcntl(request.fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
141 return request
144def request(fd: FDLike, config: dict, blocking: bool = False) -> raw.gpio_v2_line_request:
145 """Make a request to reserve the given line(s)
147 Args:
148 fd (FDLike): a gpiochip file number or file like object
149 config (dict): line config as specified in GPIO line configuration request
150 blocking (bool, optional): Make the return FD blocking. Defaults to False.
152 Returns:
153 raw.gpio_v2_line_request: The details of the request. Field `fd` contains the new open file descritor
154 """
156 req = encode_request(config)
157 raw_request(fd, req, blocking=blocking)
158 return req
161def get_values(req_fd: FDLike, mask: int) -> raw.gpio_v2_line_values:
162 """Read lines values.
164 Args:
165 req_fd (FDLike): a gpiochip file number or file like object
166 mask (int): a bitmap identifying the lines, with each bit number corresponding to
167 the index of the line reserved in the given req_fd.
169 Returns:
170 raw.gpio_v2_line_values: the current line values
171 """
172 result = raw.gpio_v2_line_values(mask=mask)
173 return ioctl(req_fd, IOC.LINE_GET_VALUES, result)
176def set_values(req_fd: FDLike, mask: int, bits: int) -> raw.gpio_v2_line_values:
177 """
178 Set lines values.
180 Args:
181 req_fd (FDLike): a gpiochip file number or file like object
182 mask: The mask is a bitmap identifying the lines, with each bit number corresponding to
183 the index of the line reserved in the given req_fd.
184 bits: The bits is a bitmap containing the value of the lines, set to 1 for active and 0
185 for inactive.
187 Returns:
188 raw.gpio_v2_line_values: the underlying object sent to the ioctl call
189 """
190 result = raw.gpio_v2_line_values(mask=mask, bits=bits)
191 return ioctl(req_fd, IOC.LINE_SET_VALUES, result)
194def set_config(req_fd: FDLike, flags: LineFlag):
195 config = raw.gpio_v2_line_config(flags=flags)
196 return ioctl(req_fd, IOC.LINE_SET_CONFIG, config)
199def read_one_event(req_fd: FDLike) -> LineEvent:
200 """Read one event from the given request file descriptor
202 Args:
203 req_fd (FDLike): _description_
205 Returns:
206 LineEvent: _description_
207 """
208 fd = req_fd if isinstance(req_fd, int) else req_fd.fileno()
209 data = os.read(fd, sizeof(raw.gpio_v2_line_event))
210 event = raw.gpio_v2_line_event.from_buffer_copy(data)
211 return LineEvent(
212 event.timestamp_ns * 1e-9,
213 LineEventId(event.id),
214 event.offset,
215 event.seqno,
216 event.line_seqno,
217 )
220def watch_line_info(fd: FDLike, line: int):
221 info = raw.gpio_v2_line_info(offset=line)
222 return ioctl(fd, IOC.GET_LINEINFO_WATCH, info)
225def unwatch_line_info(fd: FDLike, line: int):
226 return ioctl(fd, IOC.LINEINFO_UNWATCH, u32(line))
229def read_one_line_info_event(fd: FDLike) -> LineInfoEvent:
230 fd = to_fd(fd)
231 data = os.read(fd, sizeof(raw.gpio_v2_line_info_changed))
232 event = raw.gpio_v2_line_info_changed.from_buffer_copy(data)
233 info = decode_line_info(event.info)
234 return LineInfoEvent(
235 info.name,
236 info.consumer,
237 info.line,
238 info.flags,
239 info.attributes,
240 event.timestamp_ns * 1e-9,
241 type=LineChangedType(event.event_type),
242 )
245event_stream = functools.partial(events, read=read_one_event)
246async_event_stream = functools.partial(async_events, read=read_one_event)
248event_info_stream = functools.partial(events, read=read_one_line_info_event)
249async_event_info_stream = functools.partial(async_events, read=read_one_line_info_event)
252def expand_from_list(key: Union[int, slice, tuple], minimum, maximum) -> list[int]:
253 """Used internally in __getitem__ to expand the given key"""
254 if isinstance(key, int):
255 return [key]
256 if isinstance(key, slice):
257 start = minimum if key.start is None else key.start
258 key = slice(start, key.stop, key.step)
259 return list(range(maximum)[key])
260 return [line for item in key for line in expand_from_list(item, minimum, maximum)]
263class _Request(ReentrantOpen):
264 """Raw line request. Not to be used directly"""
266 def __init__(
267 self,
268 device: "Device",
269 config: dict,
270 blocking: bool = False,
271 ):
272 self.device = device
273 self.config = config
274 self.blocking = blocking
275 lines = (line["line"] for line in config["lines"])
276 self.line_indexes = sequence_indexes(lines)
277 self.fd = -1
278 super().__init__()
280 @property
281 def name(self) -> str:
282 """Requestor name
284 Change the requestor name must be called before the request is open
285 to take effect
287 Returns:
288 str: consumer name
289 """
290 return self.config["name"]
292 @name.setter
293 def name(self, name: str) -> None:
294 """Set requestor name. Must be called before the request is open
295 to take effect
297 Args:
298 name (str): new requestor name
299 """
300 self.config["name"] = name
302 def fileno(self) -> int:
303 return self.fd
305 def close(self):
306 if self.fd < 0:
307 return
308 os.close(self.fd)
309 self.fd = -1
311 def open(self):
312 self.fd: int = request(self.device, self.config, self.blocking).fd
314 def get_values(self, lines: Sequence[int]) -> dict[int, int]:
315 mask = index_mask(self.line_indexes, lines)
316 values = get_values(self, mask)
317 return {line: (values.bits >> self.line_indexes[line]) & 1 for line in lines}
319 def set_values(self, values: dict[int, Union[int, bool]]) -> raw.gpio_v2_line_values:
320 mask, bits = 0, 0
321 for line, value in values.items():
322 index = self.line_indexes[line]
323 mask |= 1 << index
324 if value:
325 bits |= 1 << index
326 return set_values(self, mask, bits)
329class Request(ReentrantOpen):
330 """A lazy request to reserve lines on a chip
332 Prefered creation from the `Device.request()` method.
333 """
335 def __init__(self, device, config: dict, blocking: bool = False):
336 self.config = config
337 cfg = dict(config)
338 lines = cfg.pop("lines")
339 self.line_requests: list[_Request] = []
340 self.line_map: dict[int, _Request] = {}
341 for chunk in chunks(lines, 64):
342 line_request = _Request(device, {**cfg, "lines": chunk}, blocking)
343 self.line_requests.append(line_request)
344 for line in chunk:
345 self.line_map[line["line"]] = line_request
346 super().__init__()
348 def __len__(self):
349 return len(self.config["lines"])
351 @property
352 def lines(self):
353 return [line["line"] for line in self.config["lines"]]
355 @property
356 def name(self) -> str:
357 """Requestor name
359 Change the requestor name must be called before the request is open
360 to take effect
362 Returns:
363 str: consumer name
364 """
365 return self.config["name"]
367 @name.setter
368 def name(self, name: str) -> None:
369 """Set requestor name. Must be called before the request is open
370 to take effect
372 Args:
373 name (str): new requestor name
374 """
375 self.config["name"] = name
376 for req in self.line_requests:
377 req.name = name
379 def __getitem__(self, key: Union[int, tuple, slice]) -> Union[int, dict]:
380 """Reads lines values
382 Args:
383 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices
385 Returns:
386 Union[int, dict]: a dict where key is line number and value its value
387 """
388 if isinstance(key, int):
389 return self.get_values((key,))[key]
390 lines = expand_from_list(key, self.min_line, self.max_line + 1)
391 return self.get_values(lines)
393 def __setitem__(self, key: Union[int, tuple, slice], value: Union[int, Sequence[int]]):
394 """Sets the given lines values
396 Args:
397 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices
398 value (Union[int, Sequence[int]]): the value(s) to write for the given lines
400 Raises:
401 ValueError: if key is a line number and value is not a number (0 or 1)
402 """
403 if isinstance(key, int):
404 if not isinstance(value, int):
405 raise ValueError("set value for single line must be 0 or 1")
406 values = {key: value}
407 else:
408 lines = expand_from_list(key, self.min_line, self.max_line + 1)
409 if isinstance(value, int):
410 value = len(lines) * (value,)
411 values = dict(zip(lines, value))
412 self.set_values(values)
414 def __iter__(self) -> Iterable[LineEvent]:
415 """Infinite stream of line events
417 Returns:
418 Iterable[LineEvent]: the stream of events
419 """
420 return event_stream(self.filenos())
422 def __aiter__(self) -> AsyncIterator[LineEvent]:
423 """Asynchronous stream of line events
425 Returns:
426 AsyncIterator[LineEvent]: the asynchronous stream of events
427 """
428 return async_event_stream(self.filenos())
430 @property
431 def min_line(self) -> int:
432 """The smallest line number in the request
434 Returns:
435 int: The smallest line number in the request
436 """
437 if not hasattr(self, "_min_line"):
438 self._min_line = min(self.lines)
439 return self._min_line
441 @property
442 def max_line(self) -> int:
443 """The biggest line number in the request
445 Returns:
446 int: The biggest line number in the request
447 """
448 if not hasattr(self, "_max_line"):
449 self._max_line = max(self.lines)
450 return self._max_line
452 def filenos(self) -> list[int]:
453 """List of underlying request file numbers
455 Returns:
456 list[int]: List of underlying request file numbers
457 """
458 return [request.fd for request in self.line_requests]
460 def close(self):
461 """Closes the underling request files. If request is not
462 open nothing is done.
463 """
464 for line_request in self.line_requests:
465 line_request.close()
467 def open(self):
468 """Opens the underling request files effectively reserving the lines"""
469 for line_request in self.line_requests:
470 line_request.open()
472 def get_values(self, lines: Optional[Sequence[int]] = None) -> dict[int, int]:
473 """Reads values for the given lines
475 Args:
476 lines (Optional[Sequence[int]], optional): A collection of lines. Defaults to None. Default means read all lines
478 Returns:
479 dict[int, int]: line values. Key is line number and value its value
480 """
481 if lines is None:
482 lines = self.lines
483 request_lines = {}
484 for line in lines:
485 request_line = self.line_map[line]
486 request_lines.setdefault(request_line, []).append(line)
487 result = {}
488 for request_line, local_lines in request_lines.items():
489 result.update(request_line.get_values(local_lines))
490 return result
492 def set_values(self, values: dict[int, Union[int, bool]]):
493 """Writes new values on the given lines
495 Args:
496 values (dict[int, Union[int, bool]]): key is line number and value its value
497 """
498 request_lines = {}
499 for line, value in values.items():
500 request_line = self.line_map[line]
501 request_lines.setdefault(request_line, {})[line] = value
502 for request_line, local_lines in request_lines.items():
503 request_line.set_values(local_lines)
506class Device(BaseDevice):
507 """A device represents a connection to the underlying gpio chip"""
509 PREFIX = "/dev/gpiochip"
511 def __init__(self, *args, **kwargs):
512 super().__init__(*args, **kwargs)
513 self._watch_lines = set()
515 def __len__(self) -> int:
516 """The number of lines in this chip
518 Returns:
519 int: The number of lines in this chip
520 """
521 if not hasattr(self, "_len"):
522 self._len = get_chip_info(self).lines
523 return self._len
525 def __getitem__(self, key: Union[int, tuple, slice]) -> Request:
526 """create a request for the given lines. Equivalent to `device.request(key)`
528 !!! note
530 The request is not active after this call. You need to use the request object returned
531 by this method in a context manager or manually call open/close.
533 Args:
534 key (Union[int, tuple, slice]): the line number, slice or a list of line numbers, or slices
536 Returns:
537 Request: A new request object
538 """
539 lines = expand_from_list(key, 0, len(self))
540 return self.request(lines)
542 def __iter__(self) -> Iterable[LineInfoEvent]:
543 """Infinite stream of line config events
545 Returns:
546 Iterable[LineInfoEvent]: the stream of line config events
547 """
548 return event_info_stream((self,))
550 def __aiter__(self) -> AsyncIterator[LineInfoEvent]:
551 """Asynchronous stream of line events
553 Returns:
554 AsyncIterator[LineInfoEvent]: the asynchronous stream of line info events
555 """
556 return async_event_info_stream((self,))
558 def info_stream(self, lines: Collection[int]) -> Iterable[LineInfoEvent]:
559 """Register for watching line config events on the given lines
560 and stream them
562 Args:
563 lines (Collection[int]): line numbers to watch for config events
565 Returns:
566 Iterable[LineInfoEvent]: the stream of line config events
567 """
568 with self.watching(lines):
569 yield from iter(self)
571 async def async_info_stream(self, lines: Collection[int]) -> AsyncIterator[LineInfoEvent]:
572 """Register for watching line config events on the given lines
573 and async stream them
575 Args:
576 lines (Collection[int]): line numbers to watch for config events
578 Returns:
579 AsyncIterator[LineInfoEvent]: the asynchronous stream of line info events
580 """
581 with self.watching(lines):
582 async with contextlib.aclosing(self.__aiter__()) as stream:
583 async for event in stream:
584 yield event
586 def get_info(self) -> Info:
587 """
588 Reads all information available including chip info and detailed information
589 about each chip line information
591 Returns:
592 Info: The full chip information
593 """
594 return get_info(self)
596 def watch_line(self, line: int):
597 """Register the given line for config events
599 Args:
600 line (int): the line number to register
601 """
602 watch_line_info(self, line)
604 def unwatch_line(self, line: int):
605 """Unregister the given line from config events
607 Args:
608 line (int): the line number to unregister
609 """
610 unwatch_line_info(self, line)
612 def watch_lines(self, lines: Collection[int]):
613 """Register the given lines for config events
615 Args:
616 lines (Collection[int]): the line numbers to register
617 """
618 for line in lines:
619 self.watch_line(line)
621 def unwatch_lines(self, lines: Collection[int]):
622 """Unregister the given lines from config events
624 Args:
625 lines (Collection[int]): the lines number to unregister
626 """
627 for line in lines:
628 self.unwatch_line(line)
630 @contextlib.contextmanager
631 def watching(self, lines):
632 """A context manager during which the given lines are registered
633 for line config events
635 Args:
636 lines (_type_): the line numbers to listen for config events
637 """
638 self.watch_lines(lines)
639 try:
640 yield
641 finally:
642 self.unwatch_lines(lines)
644 def request(
645 self, config: Optional[Union[Collection, list]] = None, blocking: Union[bool, object] = sentinel
646 ) -> Request:
647 """Create a request to reserve a list of lines on this chip
649 !!! note
651 The request is not active after this call. You need to use the request object returned
652 by this method in a context manager or manually call open/close.
654 Args:
655 config (dict): lines configuration
656 lines (Union[bool, sentinel], optional): blocking mode
658 Returns:
659 Request: A new request object
660 """
661 if blocking is sentinel:
662 blocking = self.is_blocking
663 config = parse_config(config, len(self))
664 return Request(self, config, blocking)
667def iter_gpio_files(path: PathLike = "/dev") -> Iterable[pathlib.Path]:
668 """Returns an iterator over all GPIO chip files.
670 !!! warning
671 Only files for which the current user has read and write access are returned
673 Args:
674 path (PathLike, optional): root path. Defaults to "/dev".
676 Returns:
677 Iterable[pathlib.Path]: an iterator over the gpiochip files found on the system
678 """
679 return iter_device_files(path=path, pattern="gpio*")
682def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
683 """Returns an iterator over all GPIO chip devices
685 Args:
686 path (PathLike, optional): root path. Defaults to "/dev".
688 Returns:
689 Iterable[Device]: an iterator over the gpiochip devices found on the system
690 """
691 return (Device(name, **kwargs) for name in iter_gpio_files(path=path))
694find = make_find(iter_devices, needs_open=False)