Coverage for linuxpy/gpio/device.py: 99%
266 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:51 +0200
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:51 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2024 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""
8Human friendly interface to linux GPIO subsystem.
10The heart of linuxpy GPIO library is the [`Device`][linuxpy.gpio.device.Device]
11class.
12The recommended way is to use one of the find methods to create a Device object
13and use it within a context manager like:
15```python
16from linuxpy.gpio.device import find, LineFlag
18with find() as gpio:
19 # request lines 5 and 6
20 lines = gpio[5, 6]
21 lines.flags = LineFlag.ACTIVE_LOW
22 with lines:
23 print(lines[:])
25```
26"""
28import collections
29import contextlib
30import fcntl
31import functools
32import os
33import pathlib
35from linuxpy.ctypes import sizeof, u32
36from linuxpy.device import BaseDevice, ReentrantOpen, iter_device_files
37from linuxpy.ioctl import ioctl
38from linuxpy.types import AsyncIterator, Collection, FDLike, Iterable, Optional, PathLike, Sequence, Union
39from linuxpy.util import (
40 aclosing,
41 async_event_stream as async_events,
42 bit_indexes,
43 chunks,
44 event_stream as events,
45 index_mask,
46 make_find,
47 sentinel,
48 sequence_indexes,
49 to_fd,
50)
52from . import raw
53from .config import encode_request, parse_config
54from .raw import IOC
56Info = collections.namedtuple("Info", "name label lines")
57ChipInfo = collections.namedtuple("ChipInfo", "name label lines")
58LineInfo = collections.namedtuple("LineInfo", "name consumer line flags attributes")
60LineFlag = raw.LineFlag
61LineAttrId = raw.LineAttrId
62LineEventId = raw.LineEventId
63LineChangedType = raw.LineChangedType
64LineEvent = collections.namedtuple("LineEvent", "timestamp type line sequence line_sequence")
65LineInfoEvent = collections.namedtuple("LineInfoEvent", "name consumer line flags attributes timestamp type")
68class LineAttributes:
69 def __init__(self):
70 self.flags = LineFlag(0)
71 self.indexes = []
72 self.debounce_period = None
75def decode_line_info(info: raw.gpio_v2_line_info) -> LineInfo:
76 attributes = LineAttributes()
77 for i in range(info.num_attrs):
78 attr = info.attrs[i]
79 if attr.id == LineAttrId.FLAGS:
80 attributes.flags = LineFlag(attr.flags)
81 elif attr.id == LineAttrId.OUTPUT_VALUES:
82 attributes.indexes = bit_indexes(attr.values)
83 elif attr.id == LineAttrId.DEBOUNCE: 83 ↛ 77line 83 didn't jump to line 77 because the condition on line 83 was always true
84 attributes.debounce_period = attr.debounce_period_us / 1_000_000
86 return LineInfo(
87 info.name.decode(),
88 info.consumer.decode(),
89 info.offset,
90 LineFlag(info.flags),
91 attributes,
92 )
95def get_chip_info(fd: FDLike) -> ChipInfo:
96 """Reads the chip information
98 Args:
99 fd (FDLike): a gpiochip file number or file like object
101 Returns:
102 ChipInfo: chip info of the given file descriptor
103 """
104 info = raw.gpiochip_info()
105 ioctl(fd, IOC.CHIPINFO, info)
106 return ChipInfo(info.name.decode(), info.label.decode(), info.lines)
109def get_line_info(fd: FDLike, line: int) -> LineInfo:
110 """Reads the given line information
112 Args:
113 fd (FDLike): a gpiochip file number or file like object
114 line (int): desired line to get information
116 Returns:
117 LineInfo: information for the given line and chip
118 """
119 info = raw.gpio_v2_line_info(offset=line)
120 ioctl(fd, IOC.GET_LINEINFO, info)
121 return decode_line_info(info)
124def get_info(fd: FDLike) -> Info:
125 """Reads the given chip the full information
127 Args:
128 fd (FDLike): a gpiochip file number or file like object
130 Returns:
131 Info: information for the given chip including all its lines
132 """
133 chip = get_chip_info(fd)
134 return Info(chip.name, chip.label, [get_line_info(fd, line) for line in range(chip.lines)])
137def raw_request(fd: FDLike, request: raw.gpio_v2_line_request, blocking=False):
138 ioctl(fd, IOC.GET_LINE, request)
139 if not blocking:
140 flag = fcntl.fcntl(request.fd, fcntl.F_GETFL)
141 fcntl.fcntl(request.fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
142 return request
145def request(fd: FDLike, config: dict, blocking: bool = False) -> raw.gpio_v2_line_request:
146 """Make a request to reserve the given line(s)
148 Args:
149 fd (FDLike): a gpiochip file number or file like object
150 config (dict): line config as specified in GPIO line configuration request
151 blocking (bool, optional): Make the return FD blocking. Defaults to False.
153 Returns:
154 raw.gpio_v2_line_request: The details of the request. Field `fd` contains the new open file descritor
155 """
157 req = encode_request(config)
158 raw_request(fd, req, blocking=blocking)
159 return req
162def get_values(req_fd: FDLike, mask: int) -> raw.gpio_v2_line_values:
163 """Read lines values.
165 Args:
166 req_fd (FDLike): a gpiochip file number or file like object
167 mask (int): a bitmap identifying the lines, with each bit number corresponding to
168 the index of the line reserved in the given req_fd.
170 Returns:
171 raw.gpio_v2_line_values: the current line values
172 """
173 result = raw.gpio_v2_line_values(mask=mask)
174 return ioctl(req_fd, IOC.LINE_GET_VALUES, result)
177def set_values(req_fd: FDLike, mask: int, bits: int) -> raw.gpio_v2_line_values:
178 """
179 Set lines values.
181 Args:
182 req_fd (FDLike): a gpiochip file number or file like object
183 mask: The mask is a bitmap identifying the lines, with each bit number corresponding to
184 the index of the line reserved in the given req_fd.
185 bits: The bits is a bitmap containing the value of the lines, set to 1 for active and 0
186 for inactive.
188 Returns:
189 raw.gpio_v2_line_values: the underlying object sent to the ioctl call
190 """
191 result = raw.gpio_v2_line_values(mask=mask, bits=bits)
192 return ioctl(req_fd, IOC.LINE_SET_VALUES, result)
195def set_config(req_fd: FDLike, flags: LineFlag):
196 config = raw.gpio_v2_line_config(flags=flags)
197 return ioctl(req_fd, IOC.LINE_SET_CONFIG, config)
200def read_one_event(req_fd: FDLike) -> LineEvent:
201 """Read one event from the given request file descriptor
203 Args:
204 req_fd (FDLike): _description_
206 Returns:
207 LineEvent: _description_
208 """
209 fd = req_fd if isinstance(req_fd, int) else req_fd.fileno()
210 data = os.read(fd, sizeof(raw.gpio_v2_line_event))
211 event = raw.gpio_v2_line_event.from_buffer_copy(data)
212 return LineEvent(
213 event.timestamp_ns * 1e-9,
214 LineEventId(event.id),
215 event.offset,
216 event.seqno,
217 event.line_seqno,
218 )
221def watch_line_info(fd: FDLike, line: int):
222 info = raw.gpio_v2_line_info(offset=line)
223 return ioctl(fd, IOC.GET_LINEINFO_WATCH, info)
226def unwatch_line_info(fd: FDLike, line: int):
227 return ioctl(fd, IOC.LINEINFO_UNWATCH, u32(line))
230def read_one_line_info_event(fd: FDLike) -> LineInfoEvent:
231 fd = to_fd(fd)
232 data = os.read(fd, sizeof(raw.gpio_v2_line_info_changed))
233 event = raw.gpio_v2_line_info_changed.from_buffer_copy(data)
234 info = decode_line_info(event.info)
235 return LineInfoEvent(
236 info.name,
237 info.consumer,
238 info.line,
239 info.flags,
240 info.attributes,
241 event.timestamp_ns * 1e-9,
242 type=LineChangedType(event.event_type),
243 )
246event_stream = functools.partial(events, read=read_one_event)
247async_event_stream = functools.partial(async_events, read=read_one_event)
249event_info_stream = functools.partial(events, read=read_one_line_info_event)
250async_event_info_stream = functools.partial(async_events, read=read_one_line_info_event)
253def expand_from_list(key: Union[int, slice, tuple], minimum, maximum) -> list[int]:
254 """Used internally in __getitem__ to expand the given key"""
255 if isinstance(key, int):
256 return [key]
257 if isinstance(key, slice):
258 start = minimum if key.start is None else key.start
259 key = slice(start, key.stop, key.step)
260 return list(range(maximum)[key])
261 return [line for item in key for line in expand_from_list(item, minimum, maximum)]
264class _Request(ReentrantOpen):
265 """Raw line request. Not to be used directly"""
267 def __init__(
268 self,
269 device: "Device",
270 config: dict,
271 blocking: bool = False,
272 ):
273 self.device = device
274 self.config = config
275 self.blocking = blocking
276 lines = (line["line"] for line in config["lines"])
277 self.line_indexes = sequence_indexes(lines)
278 self.fd = -1
279 super().__init__()
281 @property
282 def name(self) -> str:
283 """Requestor name
285 Change the requestor name must be called before the request is open
286 to take effect
288 Returns:
289 str: consumer name
290 """
291 return self.config["name"]
293 @name.setter
294 def name(self, name: str) -> None:
295 """Set requestor name. Must be called before the request is open
296 to take effect
298 Args:
299 name (str): new requestor name
300 """
301 self.config["name"] = name
303 def fileno(self) -> int:
304 return self.fd
306 def close(self):
307 if self.fd < 0:
308 return
309 os.close(self.fd)
310 self.fd = -1
312 def open(self):
313 self.fd: int = request(self.device, self.config, self.blocking).fd
315 def get_values(self, lines: Sequence[int]) -> dict[int, int]:
316 mask = index_mask(self.line_indexes, lines)
317 values = get_values(self, mask)
318 return {line: (values.bits >> self.line_indexes[line]) & 1 for line in lines}
320 def set_values(self, values: dict[int, Union[int, bool]]) -> raw.gpio_v2_line_values:
321 mask, bits = 0, 0
322 for line, value in values.items():
323 index = self.line_indexes[line]
324 mask |= 1 << index
325 if value:
326 bits |= 1 << index
327 return set_values(self, mask, bits)
330class Request(ReentrantOpen):
331 """A lazy request to reserve lines on a chip
333 Prefered creation from the `Device.request()` method.
334 """
336 def __init__(self, device, config: dict, blocking: bool = False):
337 self.config = config
338 cfg = dict(config)
339 lines = cfg.pop("lines")
340 self.line_requests: list[_Request] = []
341 self.line_map: dict[int, _Request] = {}
342 for chunk in chunks(lines, 64):
343 line_request = _Request(device, {**cfg, "lines": chunk}, blocking)
344 self.line_requests.append(line_request)
345 for line in chunk:
346 self.line_map[line["line"]] = line_request
347 super().__init__()
349 def __len__(self):
350 return len(self.config["lines"])
352 @property
353 def lines(self):
354 return [line["line"] for line in self.config["lines"]]
356 @property
357 def name(self) -> str:
358 """Requestor name
360 Change the requestor name must be called before the request is open
361 to take effect
363 Returns:
364 str: consumer name
365 """
366 return self.config["name"]
368 @name.setter
369 def name(self, name: str) -> None:
370 """Set requestor name. Must be called before the request is open
371 to take effect
373 Args:
374 name (str): new requestor name
375 """
376 self.config["name"] = name
377 for req in self.line_requests:
378 req.name = name
380 def __getitem__(self, key: Union[int, tuple, slice]) -> Union[int, dict]:
381 """Reads lines values
383 Args:
384 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices
386 Returns:
387 Union[int, dict]: a dict where key is line number and value its value
388 """
389 if isinstance(key, int):
390 return self.get_values((key,))[key]
391 lines = expand_from_list(key, self.min_line, self.max_line + 1)
392 return self.get_values(lines)
394 def __setitem__(self, key: Union[int, tuple, slice], value: Union[int, Sequence[int]]):
395 """Sets the given lines values
397 Args:
398 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices
399 value (Union[int, Sequence[int]]): the value(s) to write for the given lines
401 Raises:
402 ValueError: if key is a line number and value is not a number (0 or 1)
403 """
404 if isinstance(key, int):
405 if not isinstance(value, int):
406 raise ValueError("set value for single line must be 0 or 1")
407 values = {key: value}
408 else:
409 lines = expand_from_list(key, self.min_line, self.max_line + 1)
410 if isinstance(value, int):
411 value = len(lines) * (value,)
412 values = dict(zip(lines, value))
413 self.set_values(values)
415 def __iter__(self) -> Iterable[LineEvent]:
416 """Infinite stream of line events
418 Returns:
419 Iterable[LineEvent]: the stream of events
420 """
421 return event_stream(self.filenos())
423 def __aiter__(self) -> AsyncIterator[LineEvent]:
424 """Asynchronous stream of line events
426 Returns:
427 AsyncIterator[LineEvent]: the asynchronous stream of events
428 """
429 return async_event_stream(self.filenos())
431 @property
432 def min_line(self) -> int:
433 """The smallest line number in the request
435 Returns:
436 int: The smallest line number in the request
437 """
438 if not hasattr(self, "_min_line"):
439 self._min_line = min(self.lines)
440 return self._min_line
442 @property
443 def max_line(self) -> int:
444 """The biggest line number in the request
446 Returns:
447 int: The biggest line number in the request
448 """
449 if not hasattr(self, "_max_line"):
450 self._max_line = max(self.lines)
451 return self._max_line
453 def filenos(self) -> list[int]:
454 """List of underlying request file numbers
456 Returns:
457 list[int]: List of underlying request file numbers
458 """
459 return [request.fd for request in self.line_requests]
461 def close(self):
462 """Closes the underling request files. If request is not
463 open nothing is done.
464 """
465 for line_request in self.line_requests:
466 line_request.close()
468 def open(self):
469 """Opens the underling request files effectively reserving the lines"""
470 for line_request in self.line_requests:
471 line_request.open()
473 def get_values(self, lines: Optional[Sequence[int]] = None) -> dict[int, int]:
474 """Reads values for the given lines
476 Args:
477 lines (Optional[Sequence[int]], optional): A collection of lines. Defaults to None. Default means read all lines
479 Returns:
480 dict[int, int]: line values. Key is line number and value its value
481 """
482 if lines is None:
483 lines = self.lines
484 request_lines = {}
485 for line in lines:
486 request_line = self.line_map[line]
487 request_lines.setdefault(request_line, []).append(line)
488 result = {}
489 for request_line, local_lines in request_lines.items():
490 result.update(request_line.get_values(local_lines))
491 return result
493 def set_values(self, values: dict[int, Union[int, bool]]):
494 """Writes new values on the given lines
496 Args:
497 values (dict[int, Union[int, bool]]): key is line number and value its value
498 """
499 request_lines = {}
500 for line, value in values.items():
501 request_line = self.line_map[line]
502 request_lines.setdefault(request_line, {})[line] = value
503 for request_line, local_lines in request_lines.items():
504 request_line.set_values(local_lines)
507class Device(BaseDevice):
508 """A device represents a connection to the underlying gpio chip"""
510 PREFIX = "/dev/gpiochip"
512 def __init__(self, *args, **kwargs):
513 super().__init__(*args, **kwargs)
514 self._watch_lines = set()
516 def __len__(self) -> int:
517 """The number of lines in this chip
519 Returns:
520 int: The number of lines in this chip
521 """
522 if not hasattr(self, "_len"):
523 self._len = get_chip_info(self).lines
524 return self._len
526 def __getitem__(self, key: Union[int, tuple, slice]) -> Request:
527 """create a request for the given lines. Equivalent to `device.request(key)`
529 !!! note
531 The request is not active after this call. You need to use the request object returned
532 by this method in a context manager or manually call open/close.
534 Args:
535 key (Union[int, tuple, slice]): the line number, slice or a list of line numbers, or slices
537 Returns:
538 Request: A new request object
539 """
540 lines = expand_from_list(key, 0, len(self))
541 return self.request(lines)
543 def __iter__(self) -> Iterable[LineInfoEvent]:
544 """Infinite stream of line info events
546 Returns:
547 Iterable[LineInfoEvent]: the stream of line info events
548 """
549 return event_info_stream((self,))
551 def __aiter__(self) -> AsyncIterator[LineInfoEvent]:
552 """Asynchronous stream of line events
554 Returns:
555 AsyncIterator[LineInfoEvent]: the asynchronous stream of line info events
556 """
557 return async_event_info_stream((self,))
559 def info_stream(self, lines: Collection[int]) -> Iterable[LineInfoEvent]:
560 with self.watching(lines):
561 yield from iter(self)
563 async def async_info_stream(self, lines: Collection[int]) -> AsyncIterator[LineInfoEvent]:
564 with self.watching(lines):
565 async with aclosing(self.__aiter__()) as stream:
566 async for event in stream: 566 ↛ exitline 566 didn't return from function 'async_info_stream' because the loop on line 566 didn't complete
567 yield event
569 def get_info(self) -> Info:
570 """
571 Reads all information available including chip info and detailed information
572 about each chip line information
574 Returns:
575 Info: The full chip information
576 """
577 return get_info(self)
579 def watch_line(self, line: int):
580 watch_line_info(self, line)
582 def unwatch_line(self, line: int):
583 unwatch_line_info(self, line)
585 def watch_lines(self, lines: Collection[int]):
586 for line in lines:
587 self.watch_line(line)
589 def unwatch_lines(self, lines: Collection[int]):
590 for line in lines:
591 self.unwatch_line(line)
593 @contextlib.contextmanager
594 def watching(self, lines):
595 self.watch_lines(lines)
596 try:
597 yield
598 finally:
599 self.unwatch_lines(lines)
601 def request(
602 self, config: Optional[Union[Collection, list]] = None, blocking: Union[bool, object] = sentinel
603 ) -> Request:
604 """Create a request to reserve a list of lines on this chip
606 !!! note
608 The request is not active after this call. You need to use the request object returned
609 by this method in a context manager or manually call open/close.
611 Args:
612 config (dict): lines configuration
613 lines (Union[bool, sentinel], optional): blocking mode
615 Returns:
616 Request: A new request object
617 """
618 if blocking is sentinel:
619 blocking = self.is_blocking
620 config = parse_config(config, len(self))
621 return Request(self, config, blocking)
624def iter_gpio_files(path: PathLike = "/dev") -> Iterable[pathlib.Path]:
625 """Returns an iterator over all GPIO chip files.
627 !!! warning
628 Only files for which the current user has read and write access are returned
630 Args:
631 path (PathLike, optional): root path. Defaults to "/dev".
633 Returns:
634 Iterable[pathlib.Path]: an iterator over the gpiochip files found on the system
635 """
636 return iter_device_files(path=path, pattern="gpio*")
639def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
640 """Returns an iterator over all GPIO chip devices
642 Args:
643 path (PathLike, optional): root path. Defaults to "/dev".
645 Returns:
646 Iterable[Device]: an iterator over the gpiochip devices found on the system
647 """
648 return (Device(name, **kwargs) for name in iter_gpio_files(path=path))
651find = make_find(iter_devices, needs_open=False)