Coverage for linuxpy/gpio/device.py: 99%

266 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-21 07:51 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2024 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7""" 

8Human friendly interface to linux GPIO subsystem. 

9 

10The heart of linuxpy GPIO library is the [`Device`][linuxpy.gpio.device.Device] 

11class. 

12The recommended way is to use one of the find methods to create a Device object 

13and use it within a context manager like: 

14 

15```python 

16from linuxpy.gpio.device import find, LineFlag 

17 

18with find() as gpio: 

19 # request lines 5 and 6 

20 lines = gpio[5, 6] 

21 lines.flags = LineFlag.ACTIVE_LOW 

22 with lines: 

23 print(lines[:]) 

24 

25``` 

26""" 

27 

28import collections 

29import contextlib 

30import fcntl 

31import functools 

32import os 

33import pathlib 

34 

35from linuxpy.ctypes import sizeof, u32 

36from linuxpy.device import BaseDevice, ReentrantOpen, iter_device_files 

37from linuxpy.ioctl import ioctl 

38from linuxpy.types import AsyncIterator, Collection, FDLike, Iterable, Optional, PathLike, Sequence, Union 

39from linuxpy.util import ( 

40 aclosing, 

41 async_event_stream as async_events, 

42 bit_indexes, 

43 chunks, 

44 event_stream as events, 

45 index_mask, 

46 make_find, 

47 sentinel, 

48 sequence_indexes, 

49 to_fd, 

50) 

51 

52from . import raw 

53from .config import encode_request, parse_config 

54from .raw import IOC 

55 

56Info = collections.namedtuple("Info", "name label lines") 

57ChipInfo = collections.namedtuple("ChipInfo", "name label lines") 

58LineInfo = collections.namedtuple("LineInfo", "name consumer line flags attributes") 

59 

60LineFlag = raw.LineFlag 

61LineAttrId = raw.LineAttrId 

62LineEventId = raw.LineEventId 

63LineChangedType = raw.LineChangedType 

64LineEvent = collections.namedtuple("LineEvent", "timestamp type line sequence line_sequence") 

65LineInfoEvent = collections.namedtuple("LineInfoEvent", "name consumer line flags attributes timestamp type") 

66 

67 

68class LineAttributes: 

69 def __init__(self): 

70 self.flags = LineFlag(0) 

71 self.indexes = [] 

72 self.debounce_period = None 

73 

74 

75def decode_line_info(info: raw.gpio_v2_line_info) -> LineInfo: 

76 attributes = LineAttributes() 

77 for i in range(info.num_attrs): 

78 attr = info.attrs[i] 

79 if attr.id == LineAttrId.FLAGS: 

80 attributes.flags = LineFlag(attr.flags) 

81 elif attr.id == LineAttrId.OUTPUT_VALUES: 

82 attributes.indexes = bit_indexes(attr.values) 

83 elif attr.id == LineAttrId.DEBOUNCE: 83 ↛ 77line 83 didn't jump to line 77 because the condition on line 83 was always true

84 attributes.debounce_period = attr.debounce_period_us / 1_000_000 

85 

86 return LineInfo( 

87 info.name.decode(), 

88 info.consumer.decode(), 

89 info.offset, 

90 LineFlag(info.flags), 

91 attributes, 

92 ) 

93 

94 

95def get_chip_info(fd: FDLike) -> ChipInfo: 

96 """Reads the chip information 

97 

98 Args: 

99 fd (FDLike): a gpiochip file number or file like object 

100 

101 Returns: 

102 ChipInfo: chip info of the given file descriptor 

103 """ 

104 info = raw.gpiochip_info() 

105 ioctl(fd, IOC.CHIPINFO, info) 

106 return ChipInfo(info.name.decode(), info.label.decode(), info.lines) 

107 

108 

109def get_line_info(fd: FDLike, line: int) -> LineInfo: 

110 """Reads the given line information 

111 

112 Args: 

113 fd (FDLike): a gpiochip file number or file like object 

114 line (int): desired line to get information 

115 

116 Returns: 

117 LineInfo: information for the given line and chip 

118 """ 

119 info = raw.gpio_v2_line_info(offset=line) 

120 ioctl(fd, IOC.GET_LINEINFO, info) 

121 return decode_line_info(info) 

122 

123 

124def get_info(fd: FDLike) -> Info: 

125 """Reads the given chip the full information 

126 

127 Args: 

128 fd (FDLike): a gpiochip file number or file like object 

129 

130 Returns: 

131 Info: information for the given chip including all its lines 

132 """ 

133 chip = get_chip_info(fd) 

134 return Info(chip.name, chip.label, [get_line_info(fd, line) for line in range(chip.lines)]) 

135 

136 

137def raw_request(fd: FDLike, request: raw.gpio_v2_line_request, blocking=False): 

138 ioctl(fd, IOC.GET_LINE, request) 

139 if not blocking: 

140 flag = fcntl.fcntl(request.fd, fcntl.F_GETFL) 

141 fcntl.fcntl(request.fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) 

142 return request 

143 

144 

145def request(fd: FDLike, config: dict, blocking: bool = False) -> raw.gpio_v2_line_request: 

146 """Make a request to reserve the given line(s) 

147 

148 Args: 

149 fd (FDLike): a gpiochip file number or file like object 

150 config (dict): line config as specified in GPIO line configuration request 

151 blocking (bool, optional): Make the return FD blocking. Defaults to False. 

152 

153 Returns: 

154 raw.gpio_v2_line_request: The details of the request. Field `fd` contains the new open file descritor 

155 """ 

156 

157 req = encode_request(config) 

158 raw_request(fd, req, blocking=blocking) 

159 return req 

160 

161 

162def get_values(req_fd: FDLike, mask: int) -> raw.gpio_v2_line_values: 

163 """Read lines values. 

164 

165 Args: 

166 req_fd (FDLike): a gpiochip file number or file like object 

167 mask (int): a bitmap identifying the lines, with each bit number corresponding to 

168 the index of the line reserved in the given req_fd. 

169 

170 Returns: 

171 raw.gpio_v2_line_values: the current line values 

172 """ 

173 result = raw.gpio_v2_line_values(mask=mask) 

174 return ioctl(req_fd, IOC.LINE_GET_VALUES, result) 

175 

176 

177def set_values(req_fd: FDLike, mask: int, bits: int) -> raw.gpio_v2_line_values: 

178 """ 

179 Set lines values. 

180 

181 Args: 

182 req_fd (FDLike): a gpiochip file number or file like object 

183 mask: The mask is a bitmap identifying the lines, with each bit number corresponding to 

184 the index of the line reserved in the given req_fd. 

185 bits: The bits is a bitmap containing the value of the lines, set to 1 for active and 0 

186 for inactive. 

187 

188 Returns: 

189 raw.gpio_v2_line_values: the underlying object sent to the ioctl call 

190 """ 

191 result = raw.gpio_v2_line_values(mask=mask, bits=bits) 

192 return ioctl(req_fd, IOC.LINE_SET_VALUES, result) 

193 

194 

195def set_config(req_fd: FDLike, flags: LineFlag): 

196 config = raw.gpio_v2_line_config(flags=flags) 

197 return ioctl(req_fd, IOC.LINE_SET_CONFIG, config) 

198 

199 

200def read_one_event(req_fd: FDLike) -> LineEvent: 

201 """Read one event from the given request file descriptor 

202 

203 Args: 

204 req_fd (FDLike): _description_ 

205 

206 Returns: 

207 LineEvent: _description_ 

208 """ 

209 fd = req_fd if isinstance(req_fd, int) else req_fd.fileno() 

210 data = os.read(fd, sizeof(raw.gpio_v2_line_event)) 

211 event = raw.gpio_v2_line_event.from_buffer_copy(data) 

212 return LineEvent( 

213 event.timestamp_ns * 1e-9, 

214 LineEventId(event.id), 

215 event.offset, 

216 event.seqno, 

217 event.line_seqno, 

218 ) 

219 

220 

221def watch_line_info(fd: FDLike, line: int): 

222 info = raw.gpio_v2_line_info(offset=line) 

223 return ioctl(fd, IOC.GET_LINEINFO_WATCH, info) 

224 

225 

226def unwatch_line_info(fd: FDLike, line: int): 

227 return ioctl(fd, IOC.LINEINFO_UNWATCH, u32(line)) 

228 

229 

230def read_one_line_info_event(fd: FDLike) -> LineInfoEvent: 

231 fd = to_fd(fd) 

232 data = os.read(fd, sizeof(raw.gpio_v2_line_info_changed)) 

233 event = raw.gpio_v2_line_info_changed.from_buffer_copy(data) 

234 info = decode_line_info(event.info) 

235 return LineInfoEvent( 

236 info.name, 

237 info.consumer, 

238 info.line, 

239 info.flags, 

240 info.attributes, 

241 event.timestamp_ns * 1e-9, 

242 type=LineChangedType(event.event_type), 

243 ) 

244 

245 

246event_stream = functools.partial(events, read=read_one_event) 

247async_event_stream = functools.partial(async_events, read=read_one_event) 

248 

249event_info_stream = functools.partial(events, read=read_one_line_info_event) 

250async_event_info_stream = functools.partial(async_events, read=read_one_line_info_event) 

251 

252 

253def expand_from_list(key: Union[int, slice, tuple], minimum, maximum) -> list[int]: 

254 """Used internally in __getitem__ to expand the given key""" 

255 if isinstance(key, int): 

256 return [key] 

257 if isinstance(key, slice): 

258 start = minimum if key.start is None else key.start 

259 key = slice(start, key.stop, key.step) 

260 return list(range(maximum)[key]) 

261 return [line for item in key for line in expand_from_list(item, minimum, maximum)] 

262 

263 

264class _Request(ReentrantOpen): 

265 """Raw line request. Not to be used directly""" 

266 

267 def __init__( 

268 self, 

269 device: "Device", 

270 config: dict, 

271 blocking: bool = False, 

272 ): 

273 self.device = device 

274 self.config = config 

275 self.blocking = blocking 

276 lines = (line["line"] for line in config["lines"]) 

277 self.line_indexes = sequence_indexes(lines) 

278 self.fd = -1 

279 super().__init__() 

280 

281 @property 

282 def name(self) -> str: 

283 """Requestor name 

284 

285 Change the requestor name must be called before the request is open 

286 to take effect 

287 

288 Returns: 

289 str: consumer name 

290 """ 

291 return self.config["name"] 

292 

293 @name.setter 

294 def name(self, name: str) -> None: 

295 """Set requestor name. Must be called before the request is open 

296 to take effect 

297 

298 Args: 

299 name (str): new requestor name 

300 """ 

301 self.config["name"] = name 

302 

303 def fileno(self) -> int: 

304 return self.fd 

305 

306 def close(self): 

307 if self.fd < 0: 

308 return 

309 os.close(self.fd) 

310 self.fd = -1 

311 

312 def open(self): 

313 self.fd: int = request(self.device, self.config, self.blocking).fd 

314 

315 def get_values(self, lines: Sequence[int]) -> dict[int, int]: 

316 mask = index_mask(self.line_indexes, lines) 

317 values = get_values(self, mask) 

318 return {line: (values.bits >> self.line_indexes[line]) & 1 for line in lines} 

319 

320 def set_values(self, values: dict[int, Union[int, bool]]) -> raw.gpio_v2_line_values: 

321 mask, bits = 0, 0 

322 for line, value in values.items(): 

323 index = self.line_indexes[line] 

324 mask |= 1 << index 

325 if value: 

326 bits |= 1 << index 

327 return set_values(self, mask, bits) 

328 

329 

330class Request(ReentrantOpen): 

331 """A lazy request to reserve lines on a chip 

332 

333 Prefered creation from the `Device.request()` method. 

334 """ 

335 

336 def __init__(self, device, config: dict, blocking: bool = False): 

337 self.config = config 

338 cfg = dict(config) 

339 lines = cfg.pop("lines") 

340 self.line_requests: list[_Request] = [] 

341 self.line_map: dict[int, _Request] = {} 

342 for chunk in chunks(lines, 64): 

343 line_request = _Request(device, {**cfg, "lines": chunk}, blocking) 

344 self.line_requests.append(line_request) 

345 for line in chunk: 

346 self.line_map[line["line"]] = line_request 

347 super().__init__() 

348 

349 def __len__(self): 

350 return len(self.config["lines"]) 

351 

352 @property 

353 def lines(self): 

354 return [line["line"] for line in self.config["lines"]] 

355 

356 @property 

357 def name(self) -> str: 

358 """Requestor name 

359 

360 Change the requestor name must be called before the request is open 

361 to take effect 

362 

363 Returns: 

364 str: consumer name 

365 """ 

366 return self.config["name"] 

367 

368 @name.setter 

369 def name(self, name: str) -> None: 

370 """Set requestor name. Must be called before the request is open 

371 to take effect 

372 

373 Args: 

374 name (str): new requestor name 

375 """ 

376 self.config["name"] = name 

377 for req in self.line_requests: 

378 req.name = name 

379 

380 def __getitem__(self, key: Union[int, tuple, slice]) -> Union[int, dict]: 

381 """Reads lines values 

382 

383 Args: 

384 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices 

385 

386 Returns: 

387 Union[int, dict]: a dict where key is line number and value its value 

388 """ 

389 if isinstance(key, int): 

390 return self.get_values((key,))[key] 

391 lines = expand_from_list(key, self.min_line, self.max_line + 1) 

392 return self.get_values(lines) 

393 

394 def __setitem__(self, key: Union[int, tuple, slice], value: Union[int, Sequence[int]]): 

395 """Sets the given lines values 

396 

397 Args: 

398 key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices 

399 value (Union[int, Sequence[int]]): the value(s) to write for the given lines 

400 

401 Raises: 

402 ValueError: if key is a line number and value is not a number (0 or 1) 

403 """ 

404 if isinstance(key, int): 

405 if not isinstance(value, int): 

406 raise ValueError("set value for single line must be 0 or 1") 

407 values = {key: value} 

408 else: 

409 lines = expand_from_list(key, self.min_line, self.max_line + 1) 

410 if isinstance(value, int): 

411 value = len(lines) * (value,) 

412 values = dict(zip(lines, value)) 

413 self.set_values(values) 

414 

415 def __iter__(self) -> Iterable[LineEvent]: 

416 """Infinite stream of line events 

417 

418 Returns: 

419 Iterable[LineEvent]: the stream of events 

420 """ 

421 return event_stream(self.filenos()) 

422 

423 def __aiter__(self) -> AsyncIterator[LineEvent]: 

424 """Asynchronous stream of line events 

425 

426 Returns: 

427 AsyncIterator[LineEvent]: the asynchronous stream of events 

428 """ 

429 return async_event_stream(self.filenos()) 

430 

431 @property 

432 def min_line(self) -> int: 

433 """The smallest line number in the request 

434 

435 Returns: 

436 int: The smallest line number in the request 

437 """ 

438 if not hasattr(self, "_min_line"): 

439 self._min_line = min(self.lines) 

440 return self._min_line 

441 

442 @property 

443 def max_line(self) -> int: 

444 """The biggest line number in the request 

445 

446 Returns: 

447 int: The biggest line number in the request 

448 """ 

449 if not hasattr(self, "_max_line"): 

450 self._max_line = max(self.lines) 

451 return self._max_line 

452 

453 def filenos(self) -> list[int]: 

454 """List of underlying request file numbers 

455 

456 Returns: 

457 list[int]: List of underlying request file numbers 

458 """ 

459 return [request.fd for request in self.line_requests] 

460 

461 def close(self): 

462 """Closes the underling request files. If request is not 

463 open nothing is done. 

464 """ 

465 for line_request in self.line_requests: 

466 line_request.close() 

467 

468 def open(self): 

469 """Opens the underling request files effectively reserving the lines""" 

470 for line_request in self.line_requests: 

471 line_request.open() 

472 

473 def get_values(self, lines: Optional[Sequence[int]] = None) -> dict[int, int]: 

474 """Reads values for the given lines 

475 

476 Args: 

477 lines (Optional[Sequence[int]], optional): A collection of lines. Defaults to None. Default means read all lines 

478 

479 Returns: 

480 dict[int, int]: line values. Key is line number and value its value 

481 """ 

482 if lines is None: 

483 lines = self.lines 

484 request_lines = {} 

485 for line in lines: 

486 request_line = self.line_map[line] 

487 request_lines.setdefault(request_line, []).append(line) 

488 result = {} 

489 for request_line, local_lines in request_lines.items(): 

490 result.update(request_line.get_values(local_lines)) 

491 return result 

492 

493 def set_values(self, values: dict[int, Union[int, bool]]): 

494 """Writes new values on the given lines 

495 

496 Args: 

497 values (dict[int, Union[int, bool]]): key is line number and value its value 

498 """ 

499 request_lines = {} 

500 for line, value in values.items(): 

501 request_line = self.line_map[line] 

502 request_lines.setdefault(request_line, {})[line] = value 

503 for request_line, local_lines in request_lines.items(): 

504 request_line.set_values(local_lines) 

505 

506 

507class Device(BaseDevice): 

508 """A device represents a connection to the underlying gpio chip""" 

509 

510 PREFIX = "/dev/gpiochip" 

511 

512 def __init__(self, *args, **kwargs): 

513 super().__init__(*args, **kwargs) 

514 self._watch_lines = set() 

515 

516 def __len__(self) -> int: 

517 """The number of lines in this chip 

518 

519 Returns: 

520 int: The number of lines in this chip 

521 """ 

522 if not hasattr(self, "_len"): 

523 self._len = get_chip_info(self).lines 

524 return self._len 

525 

526 def __getitem__(self, key: Union[int, tuple, slice]) -> Request: 

527 """create a request for the given lines. Equivalent to `device.request(key)` 

528 

529 !!! note 

530 

531 The request is not active after this call. You need to use the request object returned 

532 by this method in a context manager or manually call open/close. 

533 

534 Args: 

535 key (Union[int, tuple, slice]): the line number, slice or a list of line numbers, or slices 

536 

537 Returns: 

538 Request: A new request object 

539 """ 

540 lines = expand_from_list(key, 0, len(self)) 

541 return self.request(lines) 

542 

543 def __iter__(self) -> Iterable[LineInfoEvent]: 

544 """Infinite stream of line info events 

545 

546 Returns: 

547 Iterable[LineInfoEvent]: the stream of line info events 

548 """ 

549 return event_info_stream((self,)) 

550 

551 def __aiter__(self) -> AsyncIterator[LineInfoEvent]: 

552 """Asynchronous stream of line events 

553 

554 Returns: 

555 AsyncIterator[LineInfoEvent]: the asynchronous stream of line info events 

556 """ 

557 return async_event_info_stream((self,)) 

558 

559 def info_stream(self, lines: Collection[int]) -> Iterable[LineInfoEvent]: 

560 with self.watching(lines): 

561 yield from iter(self) 

562 

563 async def async_info_stream(self, lines: Collection[int]) -> AsyncIterator[LineInfoEvent]: 

564 with self.watching(lines): 

565 async with aclosing(self.__aiter__()) as stream: 

566 async for event in stream: 566 ↛ exitline 566 didn't return from function 'async_info_stream' because the loop on line 566 didn't complete

567 yield event 

568 

569 def get_info(self) -> Info: 

570 """ 

571 Reads all information available including chip info and detailed information 

572 about each chip line information 

573 

574 Returns: 

575 Info: The full chip information 

576 """ 

577 return get_info(self) 

578 

579 def watch_line(self, line: int): 

580 watch_line_info(self, line) 

581 

582 def unwatch_line(self, line: int): 

583 unwatch_line_info(self, line) 

584 

585 def watch_lines(self, lines: Collection[int]): 

586 for line in lines: 

587 self.watch_line(line) 

588 

589 def unwatch_lines(self, lines: Collection[int]): 

590 for line in lines: 

591 self.unwatch_line(line) 

592 

593 @contextlib.contextmanager 

594 def watching(self, lines): 

595 self.watch_lines(lines) 

596 try: 

597 yield 

598 finally: 

599 self.unwatch_lines(lines) 

600 

601 def request( 

602 self, config: Optional[Union[Collection, list]] = None, blocking: Union[bool, object] = sentinel 

603 ) -> Request: 

604 """Create a request to reserve a list of lines on this chip 

605 

606 !!! note 

607 

608 The request is not active after this call. You need to use the request object returned 

609 by this method in a context manager or manually call open/close. 

610 

611 Args: 

612 config (dict): lines configuration 

613 lines (Union[bool, sentinel], optional): blocking mode 

614 

615 Returns: 

616 Request: A new request object 

617 """ 

618 if blocking is sentinel: 

619 blocking = self.is_blocking 

620 config = parse_config(config, len(self)) 

621 return Request(self, config, blocking) 

622 

623 

624def iter_gpio_files(path: PathLike = "/dev") -> Iterable[pathlib.Path]: 

625 """Returns an iterator over all GPIO chip files. 

626 

627 !!! warning 

628 Only files for which the current user has read and write access are returned 

629 

630 Args: 

631 path (PathLike, optional): root path. Defaults to "/dev". 

632 

633 Returns: 

634 Iterable[pathlib.Path]: an iterator over the gpiochip files found on the system 

635 """ 

636 return iter_device_files(path=path, pattern="gpio*") 

637 

638 

639def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

640 """Returns an iterator over all GPIO chip devices 

641 

642 Args: 

643 path (PathLike, optional): root path. Defaults to "/dev". 

644 

645 Returns: 

646 Iterable[Device]: an iterator over the gpiochip devices found on the system 

647 """ 

648 return (Device(name, **kwargs) for name in iter_gpio_files(path=path)) 

649 

650 

651find = make_find(iter_devices, needs_open=False)