Coverage for linuxpy/gpio/device.py: 99%

266 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2025-05-27 13:54 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2024 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7""" 

8Human friendly interface to linux GPIO subsystem. 

9 

10The heart of linuxpy GPIO library is the [`Device`][linuxpy.gpio.device.Device] 

11class. 

12The recommended way is to use one of the find methods to create a Device object 

13and use it within a context manager like: 

14 

15```python 

16from linuxpy.gpio.device import find, LineFlag 

17 

18with find() as gpio: 

19 # request lines 5 and 6 

20 lines = gpio[5, 6] 

21 lines.flags = LineFlag.ACTIVE_LOW 

22 with lines: 

23 print(lines[:]) 

24 

25``` 

26""" 

27 

28import collections 

29import contextlib 

30import fcntl 

31import functools 

32import os 

33import pathlib 

34 

35from linuxpy.ctypes import sizeof, u32 

36from linuxpy.device import BaseDevice, ReentrantOpen, iter_device_files 

37from linuxpy.ioctl import ioctl 

38from linuxpy.types import AsyncIterator, Collection, FDLike, Iterable, Optional, PathLike, Sequence, Union 

39from linuxpy.util import ( 

40 async_event_stream as async_events, 

41 bit_indexes, 

42 chunks, 

43 event_stream as events, 

44 index_mask, 

45 make_find, 

46 sentinel, 

47 sequence_indexes, 

48 to_fd, 

49) 

50 

51from . import raw 

52from .config import encode_request, parse_config 

53from .raw import IOC 

54 

55Info = collections.namedtuple("Info", "name label lines") 

56ChipInfo = collections.namedtuple("ChipInfo", "name label lines") 

57LineInfo = collections.namedtuple("LineInfo", "name consumer line flags attributes") 

58 

59LineFlag = raw.LineFlag 

60LineAttrId = raw.LineAttrId 

61LineEventId = raw.LineEventId 

62LineChangedType = raw.LineChangedType 

63LineEvent = collections.namedtuple("LineEvent", "timestamp type line sequence line_sequence") 

64LineInfoEvent = collections.namedtuple("LineInfoEvent", "name consumer line flags attributes timestamp type") 

65 

66 

67class LineAttributes: 

68 def __init__(self): 

69 self.flags = LineFlag(0) 

70 self.indexes = [] 

71 self.debounce_period = None 

72 

73 

74def decode_line_info(info: raw.gpio_v2_line_info) -> LineInfo: 

75 attributes = LineAttributes() 

76 for i in range(info.num_attrs): 

77 attr = info.attrs[i] 

78 if attr.id == LineAttrId.FLAGS: 

79 attributes.flags = LineFlag(attr.flags) 

80 elif attr.id == LineAttrId.OUTPUT_VALUES: 

81 attributes.indexes = bit_indexes(attr.values) 

82 elif attr.id == LineAttrId.DEBOUNCE: 

83 attributes.debounce_period = attr.debounce_period_us / 1_000_000 

84 

85 return LineInfo( 

86 info.name.decode(), 

87 info.consumer.decode(), 

88 info.offset, 

89 LineFlag(info.flags), 

90 attributes, 

91 ) 

92 

93 

94def get_chip_info(fd: FDLike) -> ChipInfo: 

95 """Reads the chip information 

96 

97 Args: 

98 fd (FDLike): a gpiochip file number or file like object 

99 

100 Returns: 

101 ChipInfo: chip info of the given file descriptor 

102 """ 

103 info = raw.gpiochip_info() 

104 ioctl(fd, IOC.CHIPINFO, info) 

105 return ChipInfo(info.name.decode(), info.label.decode(), info.lines) 

106 

107 

108def get_line_info(fd: FDLike, line: int) -> LineInfo: 

109 """Reads the given line information 

110 

111 Args: 

112 fd (FDLike): a gpiochip file number or file like object 

113 line (int): desired line to get information 

114 

115 Returns: 

116 LineInfo: information for the given line and chip 

117 """ 

118 info = raw.gpio_v2_line_info(offset=line) 

119 ioctl(fd, IOC.GET_LINEINFO, info) 

120 return decode_line_info(info) 

121 

122 

123def get_info(fd: FDLike) -> Info: 

124 """Reads the given chip the full information 

125 

126 Args: 

127 fd (FDLike): a gpiochip file number or file like object 

128 

129 Returns: 

130 Info: information for the given chip including all its lines 

131 """ 

132 chip = get_chip_info(fd) 

133 return Info(chip.name, chip.label, [get_line_info(fd, line) for line in range(chip.lines)]) 

134 

135 

136def raw_request(fd: FDLike, request: raw.gpio_v2_line_request, blocking=False): 

137 ioctl(fd, IOC.GET_LINE, request) 

138 if not blocking: 

139 flag = fcntl.fcntl(request.fd, fcntl.F_GETFL) 

140 fcntl.fcntl(request.fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) 

141 return request 

142 

143 

144def request(fd: FDLike, config: dict, blocking: bool = False) -> raw.gpio_v2_line_request: 

145 """Make a request to reserve the given line(s) 

146 

147 Args: 

148 fd (FDLike): a gpiochip file number or file like object 

149 config (dict): line config as specified in GPIO line configuration request 

150 blocking (bool, optional): Make the return FD blocking. Defaults to False. 

151 

152 Returns: 

153 raw.gpio_v2_line_request: The details of the request. Field `fd` contains the new open file descritor 

154 """ 

155 

156 req = encode_request(config) 

157 raw_request(fd, req, blocking=blocking) 

158 return req 

159 

160 

161def get_values(req_fd: FDLike, mask: int) -> raw.gpio_v2_line_values: 

162 """Read lines values. 

163 

164 Args: 

165 req_fd (FDLike): a gpiochip file number or file like object 

166 mask (int): a bitmap identifying the lines, with each bit number corresponding to 

167 the index of the line reserved in the given req_fd. 

168 

169 Returns: 

170 raw.gpio_v2_line_values: the current line values 

171 """ 

172 result = raw.gpio_v2_line_values(mask=mask) 

173 return ioctl(req_fd, IOC.LINE_GET_VALUES, result) 

174 

175 

176def set_values(req_fd: FDLike, mask: int, bits: int) -> raw.gpio_v2_line_values: 

177 """ 

178 Set lines values. 

179 

180 Args: 

181 req_fd (FDLike): a gpiochip file number or file like object 

182 mask: The mask is a bitmap identifying the lines, with each bit number corresponding to 

183 the index of the line reserved in the given req_fd. 

184 bits: The bits is a bitmap containing the value of the lines, set to 1 for active and 0 

185 for inactive. 

186 

187 Returns: 

188 raw.gpio_v2_line_values: the underlying object sent to the ioctl call 

189 """ 

190 result = raw.gpio_v2_line_values(mask=mask, bits=bits) 

191 return ioctl(req_fd, IOC.LINE_SET_VALUES, result) 

192 

193 

194def set_config(req_fd: FDLike, flags: LineFlag): 

195 config = raw.gpio_v2_line_config(flags=flags) 

196 return ioctl(req_fd, IOC.LINE_SET_CONFIG, config) 

197 

198 

199def read_one_event(req_fd: FDLike) -> LineEvent: 

200 """Read one event from the given request file descriptor 

201 

202 Args: 

203 req_fd (FDLike): _description_ 

204 

205 Returns: 

206 LineEvent: _description_ 

207 """ 

208 fd = req_fd if isinstance(req_fd, int) else req_fd.fileno() 

209 data = os.read(fd, sizeof(raw.gpio_v2_line_event)) 

210 event = raw.gpio_v2_line_event.from_buffer_copy(data) 

211 return LineEvent( 

212 event.timestamp_ns * 1e-9, 

213 LineEventId(event.id), 

214 event.offset, 

215 event.seqno, 

216 event.line_seqno, 

217 ) 

218 

219 

220def watch_line_info(fd: FDLike, line: int): 

221 info = raw.gpio_v2_line_info(offset=line) 

222 return ioctl(fd, IOC.GET_LINEINFO_WATCH, info) 

223 

224 

225def unwatch_line_info(fd: FDLike, line: int): 

226 return ioctl(fd, IOC.LINEINFO_UNWATCH, u32(line)) 

227 

228 

229def read_one_line_info_event(fd: FDLike) -> LineInfoEvent: 

230 fd = to_fd(fd) 

231 data = os.read(fd, sizeof(raw.gpio_v2_line_info_changed)) 

232 event = raw.gpio_v2_line_info_changed.from_buffer_copy(data) 

233 info = decode_line_info(event.info) 

234 return LineInfoEvent( 

235 info.name, 

236 info.consumer, 

237 info.line, 

238 info.flags, 

239 info.attributes, 

240 event.timestamp_ns * 1e-9, 

241 type=LineChangedType(event.event_type), 

242 ) 

243 

244 

245event_stream = functools.partial(events, read=read_one_event) 

246async_event_stream = functools.partial(async_events, read=read_one_event) 

247 

248event_info_stream = functools.partial(events, read=read_one_line_info_event) 

249async_event_info_stream = functools.partial(async_events, read=read_one_line_info_event) 

250 

251 

252def expand_from_list(key: Union[int, slice, tuple], minimum, maximum) -> list[int]: 

253 """Used internally in __getitem__ to expand the given key""" 

254 if isinstance(key, int): 

255 return [key] 

256 if isinstance(key, slice): 

257 start = minimum if key.start is None else key.start 

258 key = slice(start, key.stop, key.step) 

259 return list(range(maximum)[key]) 

260 return [line for item in key for line in expand_from_list(item, minimum, maximum)] 

261 

262 

263class _Request(ReentrantOpen): 

264 """Raw line request. Not to be used directly""" 

265 

266 def __init__( 

267 self, 

268 device: "Device", 

269 config: dict, 

270 blocking: bool = False, 

271 ): 

272 self.device = device 

273 self.config = config 

274 self.blocking = blocking 

275 lines = (line["line"] for line in config["lines"]) 

276 self.line_indexes = sequence_indexes(lines) 

277 self.fd = -1 

278 super().__init__() 

279 

280 @property 

281 def name(self) -> str: 

282 """Requestor name 

283 

284 Change the requestor name must be called before the request is open 

285 to take effect 

286 

287 Returns: 

288 str: consumer name 

289 """ 

290 return self.config["name"] 

291 

292 @name.setter 

293 def name(self, name: str) -> None: 

294 """Set requestor name. Must be called before the request is open 

295 to take effect 

296 

297 Args: 

298 name (str): new requestor name 

299 """ 

300 self.config["name"] = name 

301 

302 def fileno(self) -> int: 

303 return self.fd 

304 

305 def close(self): 

306 if self.fd < 0: 

307 return 

308 os.close(self.fd) 

309 self.fd = -1 

310 

311 def open(self): 

312 self.fd: int = request(self.device, self.config, self.blocking).fd 

313 

314 def get_values(self, lines: Sequence[int]) -> dict[int, int]: 

315 mask = index_mask(self.line_indexes, lines) 

316 values = get_values(self, mask) 

317 return {line: (values.bits >> self.line_indexes[line]) & 1 for line in lines} 

318 

319 def set_values(self, values: dict[int, Union[int, bool]]) -> raw.gpio_v2_line_values: 

320 mask, bits = 0, 0 

321 for line, value in values.items(): 

322 index = self.line_indexes[line] 

323 mask |= 1 << index 

324 if value: 

325 bits |= 1 << index 

326 return set_values(self, mask, bits) 

327 

328 

329class Request(ReentrantOpen): 

330 """A lazy request to reserve lines on a chip 

331 

332 Prefered creation from the `Device.request()` method. 

333 """ 

334 

335 def __init__(self, device, config: dict, blocking: bool = False): 

336 self.config = config 

337 cfg = dict(config) 

338 lines = cfg.pop("lines") 

339 self.line_requests: list[_Request] = [] 

340 self.line_map: dict[int, _Request] = {} 

341 for chunk in chunks(lines, 64): 

342 line_request = _Request(device, {**cfg, "lines": chunk}, blocking) 

343 self.line_requests.append(line_request) 

344 for line in chunk: 

345 self.line_map[line["line"]] = line_request 

346 super().__init__() 

347 

348 def __len__(self): 

349 return len(self.config["lines"]) 

350 

351 @property 

352 def lines(self): 

353 return [line["line"] for line in self.config["lines"]] 

354 

355 @property 

356 def name(self) -> str: 

357 """Requestor name 

358 

359 Change the requestor name must be called before the request is open 

360 to take effect 

361 

362 Returns: 

363 str: consumer name 

364 """ 

365 return self.config["name"] 

366 

367 @name.setter 

368 def name(self, name: str) -> None: 

369 """Set requestor name. Must be called before the request is open 

370 to take effect 

371 

372 Args: 

373 name (str): new requestor name 

374 """ 

375 self.config["name"] = name 

376 for req in self.line_requests: 

377 req.name = name 

378 

379 def __getitem__(self, key: Union[int, tuple, slice]) -> Union[int, dict]: 

380 """Reads lines values 

381 

382 Args: 

383 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices 

384 

385 Returns: 

386 Union[int, dict]: a dict where key is line number and value its value 

387 """ 

388 if isinstance(key, int): 

389 return self.get_values((key,))[key] 

390 lines = expand_from_list(key, self.min_line, self.max_line + 1) 

391 return self.get_values(lines) 

392 

393 def __setitem__(self, key: Union[int, tuple, slice], value: Union[int, Sequence[int]]): 

394 """Sets the given lines values 

395 

396 Args: 

397 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices 

398 value (Union[int, Sequence[int]]): the value(s) to write for the given lines 

399 

400 Raises: 

401 ValueError: if key is a line number and value is not a number (0 or 1) 

402 """ 

403 if isinstance(key, int): 

404 if not isinstance(value, int): 

405 raise ValueError("set value for single line must be 0 or 1") 

406 values = {key: value} 

407 else: 

408 lines = expand_from_list(key, self.min_line, self.max_line + 1) 

409 if isinstance(value, int): 

410 value = len(lines) * (value,) 

411 values = dict(zip(lines, value)) 

412 self.set_values(values) 

413 

414 def __iter__(self) -> Iterable[LineEvent]: 

415 """Infinite stream of line events 

416 

417 Returns: 

418 Iterable[LineEvent]: the stream of events 

419 """ 

420 return event_stream(self.filenos()) 

421 

422 def __aiter__(self) -> AsyncIterator[LineEvent]: 

423 """Asynchronous stream of line events 

424 

425 Returns: 

426 AsyncIterator[LineEvent]: the asynchronous stream of events 

427 """ 

428 return async_event_stream(self.filenos()) 

429 

430 @property 

431 def min_line(self) -> int: 

432 """The smallest line number in the request 

433 

434 Returns: 

435 int: The smallest line number in the request 

436 """ 

437 if not hasattr(self, "_min_line"): 

438 self._min_line = min(self.lines) 

439 return self._min_line 

440 

441 @property 

442 def max_line(self) -> int: 

443 """The biggest line number in the request 

444 

445 Returns: 

446 int: The biggest line number in the request 

447 """ 

448 if not hasattr(self, "_max_line"): 

449 self._max_line = max(self.lines) 

450 return self._max_line 

451 

452 def filenos(self) -> list[int]: 

453 """List of underlying request file numbers 

454 

455 Returns: 

456 list[int]: List of underlying request file numbers 

457 """ 

458 return [request.fd for request in self.line_requests] 

459 

460 def close(self): 

461 """Closes the underling request files. If request is not 

462 open nothing is done. 

463 """ 

464 for line_request in self.line_requests: 

465 line_request.close() 

466 

467 def open(self): 

468 """Opens the underling request files effectively reserving the lines""" 

469 for line_request in self.line_requests: 

470 line_request.open() 

471 

472 def get_values(self, lines: Optional[Sequence[int]] = None) -> dict[int, int]: 

473 """Reads values for the given lines 

474 

475 Args: 

476 lines (Optional[Sequence[int]], optional): A collection of lines. Defaults to None. Default means read all lines 

477 

478 Returns: 

479 dict[int, int]: line values. Key is line number and value its value 

480 """ 

481 if lines is None: 

482 lines = self.lines 

483 request_lines = {} 

484 for line in lines: 

485 request_line = self.line_map[line] 

486 request_lines.setdefault(request_line, []).append(line) 

487 result = {} 

488 for request_line, local_lines in request_lines.items(): 

489 result.update(request_line.get_values(local_lines)) 

490 return result 

491 

492 def set_values(self, values: dict[int, Union[int, bool]]): 

493 """Writes new values on the given lines 

494 

495 Args: 

496 values (dict[int, Union[int, bool]]): key is line number and value its value 

497 """ 

498 request_lines = {} 

499 for line, value in values.items(): 

500 request_line = self.line_map[line] 

501 request_lines.setdefault(request_line, {})[line] = value 

502 for request_line, local_lines in request_lines.items(): 

503 request_line.set_values(local_lines) 

504 

505 

506class Device(BaseDevice): 

507 """A device represents a connection to the underlying gpio chip""" 

508 

509 PREFIX = "/dev/gpiochip" 

510 

511 def __init__(self, *args, **kwargs): 

512 super().__init__(*args, **kwargs) 

513 self._watch_lines = set() 

514 

515 def __len__(self) -> int: 

516 """The number of lines in this chip 

517 

518 Returns: 

519 int: The number of lines in this chip 

520 """ 

521 if not hasattr(self, "_len"): 

522 self._len = get_chip_info(self).lines 

523 return self._len 

524 

525 def __getitem__(self, key: Union[int, tuple, slice]) -> Request: 

526 """create a request for the given lines. Equivalent to `device.request(key)` 

527 

528 !!! note 

529 

530 The request is not active after this call. You need to use the request object returned 

531 by this method in a context manager or manually call open/close. 

532 

533 Args: 

534 key (Union[int, tuple, slice]): the line number, slice or a list of line numbers, or slices 

535 

536 Returns: 

537 Request: A new request object 

538 """ 

539 lines = expand_from_list(key, 0, len(self)) 

540 return self.request(lines) 

541 

542 def __iter__(self) -> Iterable[LineInfoEvent]: 

543 """Infinite stream of line config events 

544 

545 Returns: 

546 Iterable[LineInfoEvent]: the stream of line config events 

547 """ 

548 return event_info_stream((self,)) 

549 

550 def __aiter__(self) -> AsyncIterator[LineInfoEvent]: 

551 """Asynchronous stream of line events 

552 

553 Returns: 

554 AsyncIterator[LineInfoEvent]: the asynchronous stream of line info events 

555 """ 

556 return async_event_info_stream((self,)) 

557 

558 def info_stream(self, lines: Collection[int]) -> Iterable[LineInfoEvent]: 

559 """Register for watching line config events on the given lines 

560 and stream them 

561 

562 Args: 

563 lines (Collection[int]): line numbers to watch for config events 

564 

565 Returns: 

566 Iterable[LineInfoEvent]: the stream of line config events 

567 """ 

568 with self.watching(lines): 

569 yield from iter(self) 

570 

571 async def async_info_stream(self, lines: Collection[int]) -> AsyncIterator[LineInfoEvent]: 

572 """Register for watching line config events on the given lines 

573 and async stream them 

574 

575 Args: 

576 lines (Collection[int]): line numbers to watch for config events 

577 

578 Returns: 

579 AsyncIterator[LineInfoEvent]: the asynchronous stream of line info events 

580 """ 

581 with self.watching(lines): 

582 async with contextlib.aclosing(self.__aiter__()) as stream: 

583 async for event in stream: 

584 yield event 

585 

586 def get_info(self) -> Info: 

587 """ 

588 Reads all information available including chip info and detailed information 

589 about each chip line information 

590 

591 Returns: 

592 Info: The full chip information 

593 """ 

594 return get_info(self) 

595 

596 def watch_line(self, line: int): 

597 """Register the given line for config events 

598 

599 Args: 

600 line (int): the line number to register 

601 """ 

602 watch_line_info(self, line) 

603 

604 def unwatch_line(self, line: int): 

605 """Unregister the given line from config events 

606 

607 Args: 

608 line (int): the line number to unregister 

609 """ 

610 unwatch_line_info(self, line) 

611 

612 def watch_lines(self, lines: Collection[int]): 

613 """Register the given lines for config events 

614 

615 Args: 

616 lines (Collection[int]): the line numbers to register 

617 """ 

618 for line in lines: 

619 self.watch_line(line) 

620 

621 def unwatch_lines(self, lines: Collection[int]): 

622 """Unregister the given lines from config events 

623 

624 Args: 

625 lines (Collection[int]): the lines number to unregister 

626 """ 

627 for line in lines: 

628 self.unwatch_line(line) 

629 

630 @contextlib.contextmanager 

631 def watching(self, lines): 

632 """A context manager during which the given lines are registered 

633 for line config events 

634 

635 Args: 

636 lines (_type_): the line numbers to listen for config events 

637 """ 

638 self.watch_lines(lines) 

639 try: 

640 yield 

641 finally: 

642 self.unwatch_lines(lines) 

643 

644 def request( 

645 self, config: Optional[Union[Collection, list]] = None, blocking: Union[bool, object] = sentinel 

646 ) -> Request: 

647 """Create a request to reserve a list of lines on this chip 

648 

649 !!! note 

650 

651 The request is not active after this call. You need to use the request object returned 

652 by this method in a context manager or manually call open/close. 

653 

654 Args: 

655 config (dict): lines configuration 

656 lines (Union[bool, sentinel], optional): blocking mode 

657 

658 Returns: 

659 Request: A new request object 

660 """ 

661 if blocking is sentinel: 

662 blocking = self.is_blocking 

663 config = parse_config(config, len(self)) 

664 return Request(self, config, blocking) 

665 

666 

667def iter_gpio_files(path: PathLike = "/dev") -> Iterable[pathlib.Path]: 

668 """Returns an iterator over all GPIO chip files. 

669 

670 !!! warning 

671 Only files for which the current user has read and write access are returned 

672 

673 Args: 

674 path (PathLike, optional): root path. Defaults to "/dev". 

675 

676 Returns: 

677 Iterable[pathlib.Path]: an iterator over the gpiochip files found on the system 

678 """ 

679 return iter_device_files(path=path, pattern="gpio*") 

680 

681 

682def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

683 """Returns an iterator over all GPIO chip devices 

684 

685 Args: 

686 path (PathLike, optional): root path. Defaults to "/dev". 

687 

688 Returns: 

689 Iterable[Device]: an iterator over the gpiochip devices found on the system 

690 """ 

691 return (Device(name, **kwargs) for name in iter_gpio_files(path=path)) 

692 

693 

694find = make_find(iter_devices, needs_open=False)