Coverage for linuxpy/input/cli.py: 0%

62 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2025-05-27 13:54 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7import argparse 

8import asyncio 

9 

10from linuxpy.input.device import Bus, Device, async_event_batch_stream, event_batch_stream, find 

11 

12 

13def ls(_): 

14 print(f"{'Name':32} {'Bus':10} {'Location':32} {'Version':8} {'Filename':32}") 

15 for dev in sorted(find(find_all=True), key=lambda d: d.index): 

16 with dev: 

17 try: 

18 physical_location = dev.physical_location 

19 except OSError: 

20 physical_location = "-" 

21 did = dev.device_id 

22 try: 

23 bus = Bus(did.bustype).name 

24 except ValueError: 

25 bus = "-" 

26 print(f"{dev.name:<32} {bus:<10} {physical_location:<32} {dev.version:<8} {dev.filename}") 

27 

28 

29def print_event(device, event): 

30 print(f"{device.index:2} {device.name:32} {event.type.name:6} {event.code.name:16} {event.value}") 

31 

32 

33def listen(args): 

34 print(f" # {'Name':32} {'Type':6} {'Code':16} {'Value':6}") 

35 with Device.from_id(args.addr) as device: 

36 for batch in event_batch_stream(device.fileno()): 

37 for event in batch: 

38 print_event(device, event) 

39 

40 

41async def async_listen(args): 

42 print(f" # {'Name':32} {'Type':6} {'Code':16} {'Value':6}") 

43 queue = asyncio.Queue() 

44 

45 async def go(addr): 

46 with Device.from_id(addr) as device: 

47 async for event in async_event_batch_stream(device.fileno()): 

48 await queue.put((device, event)) 

49 

50 _ = [asyncio.create_task(go(addr)) for addr in args.addr] 

51 

52 while True: 

53 device, batch = await queue.get() 

54 for event in batch: 

55 print_event(device, event) 

56 

57 

58def cli(): 

59 parser = argparse.ArgumentParser() 

60 sub_parsers = parser.add_subparsers( 

61 title="sub-commands", description="valid sub-commands", help="select one command", required=True, dest="command" 

62 ) 

63 listen = sub_parsers.add_parser("listen", aliases=["dump"], help="listen for events on selected input") 

64 listen.add_argument("addr", help="address", type=int) 

65 alisten = sub_parsers.add_parser("alisten", aliases=["adump"], help="listen for events on selected input(s)") 

66 alisten.add_argument("addr", help="address(es)", type=int, nargs="+") 

67 sub_parsers.add_parser("ls", help="list inputs") 

68 return parser 

69 

70 

71def run(args): 

72 if args.command in {"listen", "dump"}: 

73 listen(args) 

74 elif args.command in {"alisten", "adump"}: 

75 asyncio.run(async_listen(args)) 

76 elif args.command == "ls": 

77 ls(args) 

78 

79 

80def main(args=None): 

81 parser = cli() 

82 args = parser.parse_args(args=args) 

83 try: 

84 run(args) 

85 except KeyboardInterrupt: 

86 print("\rCtrl-C pressed. Bailing out") 

87 

88 

89if __name__ == "__main__": 

90 main()