Coverage for linuxpy/input/device.py: 75%

403 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-18 17:55 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7""" 

8Human friendly interface to linux Input subsystem. 

9 

10The heart of linuxpy input library is the [`Device`][linuxpy.input.device.Device] 

11class. 

12The recommended way is to use one of the find methods to create a Device object 

13and use it within a context manager like: 

14 

15```python 

16from linuxpy.input.device import find_gamepad 

17 

18with find_gamepad() as gamepad: 

19 print(f"Gamepad name: {gamepad.name}") 

20``` 

21""" 

22 

23import asyncio 

24import enum 

25import functools 

26import os 

27import pathlib 

28import select 

29 

30from linuxpy.ctypes import addressof, cint, create_string_buffer, cuint, i32, sizeof 

31from linuxpy.device import BaseDevice, iter_device_files 

32from linuxpy.ioctl import IO as _IO, IOR as _IOR, IOW as _IOW, IOWR as _IOWR, ioctl 

33from linuxpy.types import AsyncIterable, Callable, Iterable, Optional, PathLike, Sequence, Union 

34from linuxpy.util import Version, add_reader_asyncio, make_find 

35 

36from .raw import ( 

37 UIOC, 

38 Absolute, 

39 AutoRepeat, 

40 Bus, 

41 EventType, 

42 ForceFeedback, 

43 Key, 

44 Led, 

45 Miscelaneous, 

46 Relative, 

47 Sound, 

48 Switch, 

49 Synchronization, 

50 ff_effect, 

51 input_absinfo, 

52 input_event, 

53 input_id, 

54 input_mask, 

55 uinput_setup, 

56) 

57 

58EVENT_SIZE = sizeof(input_event) 

59 

60EVDEV_MAGIC = ord(b"E") 

61 

62IO = functools.partial(_IO, EVDEV_MAGIC) 

63IOR = functools.partial(_IOR, EVDEV_MAGIC) 

64IOW = functools.partial(_IOW, EVDEV_MAGIC) 

65IOWR = functools.partial(_IOWR, EVDEV_MAGIC) 

66 

67 

68_S_BUFF = 512 

69 

70 

71def _enum_max(enu): 

72 # danger: assuming last enum element is _MAX (ex: Key is KEY_MAX) 

73 if isinstance(enu, enum.Enum): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 enu = type(enu) 

75 return tuple(enu)[-1] 

76 

77 

78def _enum_bit_size(enu): 

79 return _enum_max(enu) // 8 + 1 

80 

81 

82EVIOCGVERSION = IOR(0x01, cint) 

83EVIOCGID = IOR(0x02, input_id) 

84EVIOCGREP = IOR(0x03, 2 * cuint) 

85EVIOCGNAME = IOR(0x06, _S_BUFF) 

86EVIOCGPHYS = IOR(0x07, _S_BUFF) 

87EVIOCGUNIQ = IOR(0x08, _S_BUFF) 

88# EVIOCGPROP = _IOR(EVDEV_MAGIC, 0x09, _S_BUFF) 

89EVIOCGKEY = IOR(0x18, _enum_bit_size(Key)) 

90EVIOCGLED = IOR(0x19, _enum_bit_size(Led)) 

91EVIOCGSND = IOR(0x1A, _enum_bit_size(Sound)) 

92EVIOCGSW = IOR(0x1B, _enum_bit_size(Switch)) 

93 

94 

95def EVIOCGMTSLOTS(nb_slots): 

96 return IOR(0x0A, (nb_slots + 1) * sizeof(i32)) 

97 

98 

99def EVIOCGBIT(event_type_value, size): 

100 return IOR(0x20 + event_type_value, size) 

101 

102 

103def EVIOCGABS(abs_type_value): 

104 return IOR(0x40 + abs_type_value, sizeof(input_absinfo)) 

105 

106 

107def EVIOCSABS(abs_type_value): 

108 raise NotImplementedError 

109 

110 

111EVIOCSFF = IOW(0x80, ff_effect) 

112EVIOCRMFF = IOW(0x81, cint) 

113EVIOCGEFFECTS = IOR(0x84, cint) 

114EVIOCGRAB = IOW(0x90, cint) 

115 

116EVIOCGMASK = IOR(0x92, input_mask) 

117EVIOCSMASK = IOW(0x93, input_mask) 

118 

119 

120EVENT_TYPE_MAP = { 

121 EventType.SYN: Synchronization, 

122 EventType.KEY: Key, 

123 EventType.REL: Relative, 

124 EventType.ABS: Absolute, 

125 EventType.MSC: Miscelaneous, 

126 EventType.SW: Switch, 

127 EventType.LED: Led, 

128 EventType.SND: Sound, 

129 EventType.REP: AutoRepeat, 

130 EventType.FF: ForceFeedback, 

131} 

132 

133 

134def grab(fd): 

135 ioctl(fd, EVIOCGRAB, 1) 

136 

137 

138def ungrab(fd): 

139 ioctl(fd, EVIOCGRAB, 0) 

140 

141 

142def version(fd): 

143 result = cint() 

144 ioctl(fd, EVIOCGVERSION, result) 

145 return result.value 

146 

147 

148def device_id(fd): 

149 result = input_id() 

150 ioctl(fd, EVIOCGID, result) 

151 return result 

152 

153 

154def read_name(fd): 

155 result = create_string_buffer(_S_BUFF) 

156 ioctl(fd, EVIOCGNAME, result) 

157 return result.value.decode() 

158 

159 

160def physical_location(fd): 

161 result = create_string_buffer(_S_BUFF) 

162 ioctl(fd, EVIOCGPHYS, result) 

163 return result.value.decode() 

164 

165 

166def uid(fd): 

167 result = create_string_buffer(_S_BUFF) 

168 ioctl(fd, EVIOCGUNIQ, result) 

169 return result.value.decode() 

170 

171 

172def _bit(array, bit): 

173 return ord(array[bit // 8]) & (1 << (bit % 8)) 

174 

175 

176def _active(fd, code, dtype): 

177 result = create_string_buffer(_enum_bit_size(dtype)) 

178 ioctl(fd, code, result) 

179 return {item for item in dtype if _bit(result, item)} 

180 

181 

182def active_keys(fd): 

183 return _active(fd, EVIOCGKEY, Key) 

184 

185 

186def active_leds(fd): 

187 return _active(fd, EVIOCGLED, Led) 

188 

189 

190def active_sounds(fd): 

191 return _active(fd, EVIOCGSND, Sound) 

192 

193 

194def active_switches(fd): 

195 return _active(fd, EVIOCGSW, Switch) 

196 

197 

198def abs_info(fd, abs_code): 

199 result = input_absinfo() 

200 ioctl(fd, EVIOCGABS(abs_code), result) 

201 return result 

202 

203 

204def available_event_types(fd): 

205 nb_bytes = _enum_bit_size(EventType) 

206 result = create_string_buffer(nb_bytes) 

207 ioctl(fd, EVIOCGBIT(0, nb_bytes), result) 

208 return {ev_type for ev_type in EventType if _bit(result, ev_type)} 

209 

210 

211def event_type_capabilities(fd, event_type): 

212 if event_type == EventType.SYN: 

213 # cannot query EventType.SYN so just return all possibilities 

214 return list(Synchronization)[:-1] # remove SYN_MAX 

215 elif event_type == EventType.REP: 215 ↛ 217line 215 didn't jump to line 217 because the condition on line 215 was never true

216 # nothing in particular to report 

217 return [] 

218 event_code_type = EVENT_TYPE_MAP[event_type] 

219 nb_bytes = _enum_bit_size(event_code_type) 

220 event_codes_bits = create_string_buffer(nb_bytes) 

221 ioctl(fd, EVIOCGBIT(event_type, nb_bytes), event_codes_bits) 

222 return {c for c in event_code_type if _bit(event_codes_bits, c)} 

223 

224 

225def auto_repeat_settings(fd): 

226 result = (cuint * 2)() 

227 ioctl(fd, EVIOCGREP, result) 

228 return {rep: result[rep] for rep in AutoRepeat} 

229 

230 

231def capabilities(fd): 

232 event_types = available_event_types(fd) 

233 return {event_type: event_type_capabilities(fd, event_type) for event_type in event_types} 

234 

235 

236def get_input_mask(fd, event_type): 

237 event_code_type = EVENT_TYPE_MAP[event_type] 

238 nb_bytes = _enum_bit_size(event_code_type) 

239 event_codes_bits = create_string_buffer(nb_bytes) 

240 result = input_mask() 

241 result.type = event_type 

242 result.codes_size = nb_bytes 

243 result.codes_ptr = addressof(event_codes_bits) 

244 ioctl(fd, EVIOCGMASK, result) 

245 return result, event_codes_bits 

246 

247 

248def read_event(fd, read=os.read): 

249 data = read(fd, EVENT_SIZE) 

250 if len(data) < EVENT_SIZE: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 raise ValueError 

252 return input_event.from_buffer_copy(data) 

253 

254 

255def iter_input_files(path: PathLike = "/dev/input", pattern: str = "event*"): 

256 """List readable character devices in the given path.""" 

257 return iter_device_files(path, pattern) 

258 

259 

260class InputError(Exception): 

261 """Input error""" 

262 

263 

264class _Type: 

265 _event_type = None 

266 

267 def __init__(self, device: Optional["Device"] = None): 

268 self.device = device 

269 

270 def __get__(self, obj, type=None): 

271 if self._event_type not in obj.capabilities: 271 ↛ 274line 271 didn't jump to line 274 because the condition on line 271 was always true

272 name = EVENT_TYPE_MAP[self._event_type].__name__.lower() 

273 raise ValueError(f"device has no {name!r} capability") 

274 return self.__class__(obj) 

275 

276 def _check_code(self, code): 

277 if code not in self.device.capabilities[self._event_type]: 

278 raise ValueError(f"device has no {code.name!r} capability") 

279 

280 

281class _Abs(_Type): 

282 _event_type = EventType.ABS 

283 

284 def __getitem__(self, code): 

285 self._check_code(code) 

286 return self.device.get_abs_info(code).value 

287 

288 def __getattr__(self, key): 

289 name = "ABS_" + key.upper() 

290 try: 

291 return self[Absolute[name]] 

292 except KeyError: 

293 return super().__getattr__(key) 

294 

295 def __dir__(self): 

296 return [k.name[4:].lower() for k in self.device.capabilities[self._event_type]] 

297 

298 

299class _Keys(_Type): 

300 _event_type = EventType.KEY 

301 

302 def __dir__(self): 

303 return [k.name for k in self.device.capabilities[self._event_type]] 

304 

305 def __getitem__(self, code): 

306 self._check_code(code) 

307 return code in self.device.active_keys 

308 

309 def __getattr__(self, name): 

310 try: 

311 return self[Key[name.upper()]] 

312 except KeyError: 

313 return super().__getattr__(name) 

314 

315 

316class Event: 

317 """Event generated by an input device""" 

318 

319 def __init__(self, event: input_event): 

320 self.event = event 

321 

322 def __repr__(self): 

323 cname = type(self).__name__ 

324 return f"<{cname} timestamp={self.timestamp} type={self.type.name} code={self.code.name} value={self.value}>" 

325 

326 @property 

327 def timestamp(self) -> float: 

328 """The timestamp associated with the event""" 

329 return self.event.time.secs + self.event.time.usecs * 1e-6 

330 

331 @property 

332 def type(self) -> EventType: 

333 """The type of event""" 

334 return EventType(self.event.type) 

335 

336 @property 

337 def code(self): 

338 """The event code""" 

339 return EVENT_TYPE_MAP[self.type](self.event.code) 

340 

341 @property 

342 def value(self) -> int: 

343 """The event value""" 

344 return self.event.value 

345 

346 

347class Device(BaseDevice): 

348 """ 

349 Central linux input subsystem class. 

350 

351 You can create an instance directly if you know the device name: 

352 

353 ```python 

354 from linuxpy.input.device import Device 

355 

356 with Device("/dev/input11") as i11: 

357 print(i11.name) 

358 ``` 

359 

360 ... but it is generally easier to use the [`find`][linuxpy.input.device.find] 

361 helper to get a device with a certain condition. Example: 

362 

363 ```python 

364 from linuxpy.input.device import find 

365 

366 track_point = find(name="TPPS/2 Elan TrackPoint") 

367 ``` 

368 """ 

369 

370 PREFIX = "/dev/input/event" 

371 

372 absolute = _Abs() 

373 keys = _Keys() 

374 

375 def __init__(self, *args, **kwargs): 

376 self._caps = None 

377 super().__init__(*args, **kwargs) 

378 

379 def __iter__(self) -> Iterable[Event]: 

380 """ 

381 Build an infinite iterator that streams input events. 

382 You'll need an open Device before using it: 

383 

384 ```python 

385 from linuxpy.input.device import find_mouse 

386 

387 with find_mouse() as mouse: 

388 for event in mouse: 

389 print(event) 

390 ``` 

391 """ 

392 yield from event_stream(self.fileno()) 

393 

394 async def __aiter__(self) -> AsyncIterable[Event]: 

395 """ 

396 Build an infinite async iterator that streams input events. 

397 You'll need an open Device before using it: 

398 

399 ```python 

400 import asyncio 

401 from linuxpy.input.device import find_mouse 

402 

403 async def main(): 

404 with find_mouse() as mouse: 

405 async for event in mouse: 

406 print(event) 

407 

408 asyncio.run(main()) 

409 ``` 

410 """ 

411 async for event in async_event_stream(self.fileno()): 

412 yield event 

413 

414 def _on_open(self): 

415 pass 

416 

417 @functools.cached_property 

418 def uid(self) -> str: 

419 return uid(self.fileno()) 

420 

421 @functools.cached_property 

422 def name(self) -> str: 

423 """The device name""" 

424 return read_name(self.fileno()) 

425 

426 @functools.cached_property 

427 def version(self) -> Version: 

428 """The version""" 

429 return Version.from_number(version(self.fileno())) 

430 

431 @functools.cached_property 

432 def physical_location(self) -> str: 

433 """The physical location""" 

434 return physical_location(self.fileno()) 

435 

436 @functools.cached_property 

437 def device_id(self) -> input_id: 

438 """The device input ID""" 

439 return device_id(self.fileno()) 

440 

441 @functools.cached_property 

442 def capabilities(self): 

443 """The device capabilities""" 

444 if self._caps is None: 444 ↛ 446line 444 didn't jump to line 446 because the condition on line 444 was always true

445 self._caps = capabilities(self.fileno()) 

446 return self._caps 

447 

448 @property 

449 def active_keys(self): 

450 """All active keys at the moment of calling this""" 

451 return active_keys(self.fileno()) 

452 

453 def get_abs_info(self, abs_code): 

454 """Absolute information for the given abs code""" 

455 return abs_info(self.fileno(), abs_code) 

456 

457 @property 

458 def x(self): 

459 """Current absolute X value""" 

460 return self.get_abs_info(Absolute.X).value 

461 

462 @property 

463 def y(self): 

464 """Current absolute Y value""" 

465 return self.get_abs_info(Absolute.Y).value 

466 

467 @property 

468 def z(self): 

469 """Current absolute Z value""" 

470 return self.get_abs_info(Absolute.Z).value 

471 

472 @property 

473 def rx(self): 

474 """Current relative X value""" 

475 return self.get_abs_info(Absolute.RX).value 

476 

477 @property 

478 def ry(self): 

479 """Current relative Y value""" 

480 return self.get_abs_info(Absolute.RY).value 

481 

482 @property 

483 def rz(self): 

484 """Current relative Z value""" 

485 return self.get_abs_info(Absolute.RZ).value 

486 

487 def read_event(self): 

488 """ 

489 Read event. 

490 Event must be available to read or otherwise will raise an error 

491 """ 

492 return Event(read_event(self.fileno())) 

493 

494 def grab(self): 

495 """Grab the device for exclusive use""" 

496 grab(self) 

497 

498 def ungrab(self): 

499 """Release (ungrab) the device""" 

500 ungrab(self) 

501 

502 

503class Grab: 

504 """ 

505 Context manager which grabs the device on enter and releases (ungrabs) 

506 it on exit. 

507 

508 The device should be open for operation before the object is called on 

509 the `with` statement. 

510 

511 This context manager is reusable but **not** reentrant and **not** 

512 thread safe. 

513 

514 Example: 

515 

516 ```python 

517 from linuxpy.input.device import find_mouse, Grab 

518 with find_mouse() as mouse: 

519 with Grab(mouse): 

520 print(mouse.active_keys) 

521 ``` 

522 """ 

523 

524 def __init__(self, device: Device): 

525 self.device = device 

526 

527 def __enter__(self): 

528 self.device.grab() 

529 

530 def __exit__(self, *_): 

531 if not self.device.closed: 531 ↛ exitline 531 didn't return from function '__exit__' because the condition on line 531 was always true

532 self.device.ungrab() 

533 

534 

535def event_stream(fd) -> Iterable[Event]: 

536 while True: 

537 select.select((fd,), (), ()) 

538 yield Event(read_event(fd)) 

539 

540 

541async def async_event_stream(fd, maxsize: int = 1000) -> AsyncIterable[Event]: 

542 queue = asyncio.Queue(maxsize=maxsize) 

543 with add_reader_asyncio(fd, lambda: queue.put_nowait(read_event(fd))): 

544 while True: 

545 yield Event(await queue.get()) 

546 

547 

548def event_batch_stream(fd) -> Iterable[Sequence[Event]]: 

549 """Yields packets of events occurring at the same moment in time.""" 

550 packet = [] 

551 for event in event_stream(fd): 

552 if event.type == EventType.SYN: 

553 if event.code == Synchronization.REPORT: 

554 yield packet 

555 packet = [] 

556 elif event.code == Synchronization.DROPPED: 

557 packet = [] 

558 else: 

559 packet.append(event) 

560 

561 

562async def async_event_batch_stream(fd, maxsize: int = 1000) -> AsyncIterable[Sequence[Event]]: 

563 """Yields packets of events occurring at the same moment in time.""" 

564 packet = [] 

565 async for event in async_event_stream(fd, maxsize=maxsize): 

566 if event.type == EventType.SYN: 

567 if event.code == Synchronization.REPORT: 

568 yield packet 

569 packet = [] 

570 elif event.code == Synchronization.DROPPED: 

571 packet = [] 

572 else: 

573 packet.append(event) 

574 

575 

576def iter_devices(path: PathLike = "/dev/input", **kwargs) -> Iterable[Device]: 

577 return (Device(path, **kwargs) for path in iter_input_files(path=path)) 577 ↛ exitline 577 didn't finish the generator expression on line 577

578 

579 

580def is_gamepad(device: Device) -> bool: 

581 with device: 

582 caps = device.capabilities 

583 key_caps = caps.get(EventType.KEY, ()) 

584 return EventType.ABS in caps and Key.BTN_GAMEPAD in key_caps 

585 

586 

587def is_keyboard(device: Device) -> bool: 

588 with device: 

589 caps = device.capabilities 

590 key_caps = caps.get(EventType.KEY, ()) 

591 return Key.KEY_A in key_caps and Key.KEY_CAPSLOCK in key_caps 

592 

593 

594def is_mouse(device: Device) -> bool: 

595 with device: 

596 caps = device.capabilities 

597 if EventType.ABS not in caps and EventType.REL not in caps: 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true

598 return False 

599 key_caps = caps.get(EventType.KEY, ()) 

600 return Key.BTN_MOUSE in key_caps 

601 

602 

603_find = make_find(iter_devices) 

604 

605 

606def find( 

607 find_all: bool = False, custom_match: Optional[Callable] = None, **kwargs 

608) -> Union[Device, Iterable[Device], None]: 

609 """ 

610 If find_all is False: 

611 

612 Find a device follwing the criteria matched by custom_match and kwargs. 

613 If no device is found matching the criteria it returns None. 

614 Default is to return a random first device. 

615 

616 If find_all is True: 

617 

618 The result is an iterator. 

619 Find all devices that match the criteria custom_match and kwargs. 

620 If no device is found matching the criteria it returns an empty iterator. 

621 Default is to return an iterator over all input devices found on the system. 

622 """ 

623 return _find(find_all, custom_match, **kwargs) 

624 

625 

626def _make_find_input(func): 

627 def _find(find_all=False, custom_match=None, **kwargs): 

628 if custom_match: 628 ↛ 630line 628 didn't jump to line 630 because the condition on line 628 was never true

629 

630 def matches(dev): 

631 return func(dev) and custom_match(dev) 

632 else: 

633 matches = func 

634 return find(find_all=find_all, custom_match=matches, **kwargs) 

635 

636 return _find 

637 

638 

639_find_gamepad = _make_find_input(is_gamepad) 

640 

641 

642def find_gamepad( 

643 find_all: bool = False, custom_match: Optional[Callable] = None, **kwargs 

644) -> Union[Device, Iterable[Device], None]: 

645 """ 

646 If find_all is False: 

647 

648 Find a gamepad device follwing the criteria matched by custom_match and kwargs. 

649 If no device is found matching the criteria it returns None. 

650 Default is to return a random first gamepad. 

651 

652 If find_all is True: 

653 

654 The result is an iterator. 

655 Find all gamepad devices that match the criteria custom_match and kwargs. 

656 If no gamepad is found matching the criteria it returns an empty iterator. 

657 Default is to return an iterator over all gamepad devices found on the system. 

658 """ 

659 return _find_gamepad(find_all, custom_match, **kwargs) 

660 

661 

662_find_keyboard = _make_find_input(is_keyboard) 

663 

664 

665def find_keyboard( 

666 find_all: bool = False, custom_match: Optional[Callable] = None, **kwargs 

667) -> Union[Device, Iterable[Device], None]: 

668 """ 

669 If find_all is False: 

670 

671 Find a keyboard device follwing the criteria matched by custom_match and kwargs. 

672 If no device is found matching the criteria it returns None. 

673 Default is to return a random first keyboard. 

674 

675 If find_all is True: 

676 

677 The result is an iterator. 

678 Find all keyboard devices that match the criteria custom_match and kwargs. 

679 If no keyboard is found matching the criteria it returns an empty iterator. 

680 Default is to return an iterator over all keyboard devices found on the system. 

681 """ 

682 return _find_keyboard(find_all, custom_match, **kwargs) 

683 

684 

685_find_mouse = _make_find_input(is_mouse) 

686 

687 

688def find_mouse( 

689 find_all: bool = False, custom_match: Optional[Callable] = None, **kwargs 

690) -> Union[Device, Iterable[Device], None]: 

691 """ 

692 If find_all is False: 

693 

694 Find a mouse device follwing the criteria matched by custom_match and kwargs. 

695 If no device is found matching the criteria it returns None. 

696 Default is to return a random first mouse. 

697 

698 If find_all is True: 

699 

700 The result is an iterator. 

701 Find all mouse devices that match the criteria custom_match and kwargs. 

702 If no mouse is found matching the criteria it returns an empty iterator. 

703 Default is to return an iterator over all mouse devices found on the system. 

704 """ 

705 return _find_mouse(find_all, custom_match, **kwargs) 

706 

707 

708def is_uinput_available(): 

709 return BaseUDevice.PATH.exists() 

710 

711 

712def u_device_setup(fd, bus: Bus, vendor: int, product: int, name: Union[str, bytes]) -> None: 

713 setup = uinput_setup() 

714 setup.id.bustype = bus 

715 setup.id.vendor = vendor 

716 setup.id.product = product 

717 name = name[:80] 

718 if isinstance(name, str): 718 ↛ 720line 718 didn't jump to line 720 because the condition on line 718 was always true

719 name = name.encode() 

720 setup.name = name 

721 ioctl(fd, UIOC.DEV_SETUP, setup) 

722 

723 

724def u_device_create(fd): 

725 ioctl(fd, UIOC.DEV_CREATE) 

726 

727 

728def u_device_destroy(fd): 

729 ioctl(fd, UIOC.DEV_DESTROY) 

730 

731 

732def u_set_event(fd, event_type: EventType): 

733 ioctl(fd, UIOC.SET_EVBIT, event_type) 

734 

735 

736def _u_set_n(fd, ioc, values): 

737 for value in values: 

738 ioctl(fd, ioc, value) 

739 

740 

741def u_set_keys(fd, *keys: Key): 

742 _u_set_n(fd, UIOC.SET_KEYBIT, keys) 

743 

744 

745def u_set_relatives(fd, *relatives: Relative): 

746 _u_set_n(fd, UIOC.SET_RELBIT, relatives) 

747 

748 

749def u_set_absolutes(fd, *absolutes: Absolute): 

750 _u_set_n(fd, UIOC.SET_ABSBIT, absolutes) 

751 

752 

753def u_set_miscellaneous(fd, *misc: Miscelaneous): 

754 _u_set_n(fd, UIOC.SET_MSCBIT, misc) 

755 

756 

757def u_set_force_feedback(fd, *ff: ForceFeedback): 

758 _u_set_n(fd, UIOC.SET_FFBIT, ff) 

759 

760 

761def u_emit(fd, event_type, event_code, value, syn=True): 

762 event = input_event() 

763 event.type = event_type 

764 event.code = event_code 

765 event.value = value 

766 os.write(fd, bytes(event)) 

767 if syn: 

768 u_emit(fd, EventType.SYN, Synchronization.REPORT, Synchronization.REPORT, syn=False) 

769 

770 

771class BaseUDevice(BaseDevice): 

772 """A uinput device with no capabilities registered""" 

773 

774 PATH = pathlib.Path("/dev/uinput") 

775 CAPABILITIES = {} 

776 

777 def __init__( 

778 self, 

779 filename=PATH, 

780 bus=Bus.VIRTUAL, 

781 vendor_id=0x01, 

782 product_id=0x01, 

783 name="linuxpy emulated device", 

784 ): 

785 # Force write only 

786 super().__init__(filename, read_write="w") 

787 self.bustype = bus 

788 self.vendor_id = vendor_id 

789 self.product_id = product_id 

790 self.name = name 

791 

792 def _on_open(self): 

793 self.setup() 

794 self.create() 

795 

796 def _on_close(self): 

797 self.destroy() 

798 

799 def setup(self): 

800 self.set_capabilities(self.CAPABILITIES) 

801 u_device_setup(self.fileno(), self.bustype, self.vendor_id, self.product_id, self.name) 

802 

803 def set_capabilities(self, caps: dict): 

804 for event_type, capabilities in caps.items(): 

805 u_set_event(self.fileno(), event_type) 

806 if event_type == EventType.KEY: 

807 u_set_keys(self.fileno(), *capabilities) 

808 elif event_type == EventType.ABS: 

809 u_set_absolutes(self.fileno(), *capabilities) 

810 elif event_type == EventType.REL: 

811 u_set_relatives(self.fileno(), *capabilities) 

812 elif event_type == EventType.MSC: 812 ↛ 814line 812 didn't jump to line 814 because the condition on line 812 was always true

813 u_set_miscellaneous(self.fileno(), *capabilities) 

814 elif event_type == EventType.FF: 

815 u_set_force_feedback(self.fileno(), *capabilities) 

816 

817 def create(self): 

818 u_device_create(self.fileno()) 

819 

820 def destroy(self): 

821 u_device_destroy(self.fileno()) 

822 

823 def emit(self, event_type: EventType, event_code: int, value: int, syn=True): 

824 return u_emit(self.fileno(), event_type, event_code, value, syn=syn) 

825 

826 

827class UMouse(BaseUDevice): 

828 CAPABILITIES = { 

829 EventType.KEY: {Key.BTN_LEFT, Key.BTN_MIDDLE, Key.BTN_RIGHT}, 

830 EventType.REL: {Relative.X, Relative.Y}, 

831 } 

832 

833 

834class UGamepad(BaseUDevice): 

835 CAPABILITIES = { 

836 # EventType.FF: {ForceFeedback.RUMBLE, ForceFeedback.PERIODIC, ForceFeedback.SQUARE, ForceFeedback.TRIANGLE, ForceFeedback.SINE, ForceFeedback.GAIN}, 

837 EventType.KEY: { 

838 Key.BTN_GAMEPAD, 

839 Key.BTN_EAST, 

840 Key.BTN_NORTH, 

841 Key.BTN_WEST, 

842 Key.BTN_EAST, 

843 Key.BTN_SELECT, 

844 Key.BTN_START, 

845 Key.BTN_TL, 

846 Key.BTN_TR, 

847 Key.BTN_TL2, 

848 Key.BTN_TR2, 

849 Key.BTN_MODE, 

850 Key.BTN_THUMBL, 

851 Key.BTN_THUMBR, 

852 Key.BTN_DPAD_UP, 

853 Key.BTN_DPAD_DOWN, 

854 Key.BTN_DPAD_LEFT, 

855 Key.BTN_DPAD_RIGHT, 

856 }, 

857 EventType.ABS: {Absolute.X, Absolute.Y, Absolute.RX, Absolute.RY, Absolute.RZ}, 

858 EventType.MSC: {Miscelaneous.SCAN}, 

859 }