Coverage for linuxpy/input/cli.py: 0%
62 statements
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7import argparse
8import asyncio
10from linuxpy.input.device import Bus, Device, async_event_batch_stream, event_batch_stream, find
13def ls(_):
14 print(f"{'Name':32} {'Bus':10} {'Location':32} {'Version':8} {'Filename':32}")
15 for dev in sorted(find(find_all=True), key=lambda d: d.index):
16 with dev:
17 try:
18 physical_location = dev.physical_location
19 except OSError:
20 physical_location = "-"
21 did = dev.device_id
22 try:
23 bus = Bus(did.bustype).name
24 except ValueError:
25 bus = "-"
26 print(f"{dev.name:<32} {bus:<10} {physical_location:<32} {dev.version:<8} {dev.filename}")
29def print_event(device, event):
30 print(f"{device.index:2} {device.name:32} {event.type.name:6} {event.code.name:16} {event.value}")
33def listen(args):
34 print(f" # {'Name':32} {'Type':6} {'Code':16} {'Value':6}")
35 with Device.from_id(args.addr) as device:
36 for batch in event_batch_stream(device.fileno()):
37 for event in batch:
38 print_event(device, event)
41async def async_listen(args):
42 print(f" # {'Name':32} {'Type':6} {'Code':16} {'Value':6}")
43 queue = asyncio.Queue()
45 async def go(addr):
46 with Device.from_id(addr) as device:
47 async for event in async_event_batch_stream(device.fileno()):
48 await queue.put((device, event))
50 _ = [asyncio.create_task(go(addr)) for addr in args.addr]
52 while True:
53 device, batch = await queue.get()
54 for event in batch:
55 print_event(device, event)
58def cli():
59 parser = argparse.ArgumentParser()
60 sub_parsers = parser.add_subparsers(
61 title="sub-commands", description="valid sub-commands", help="select one command", required=True, dest="command"
62 )
63 listen = sub_parsers.add_parser("listen", aliases=["dump"], help="listen for events on selected input")
64 listen.add_argument("addr", help="address", type=int)
65 alisten = sub_parsers.add_parser("alisten", aliases=["adump"], help="listen for events on selected input(s)")
66 alisten.add_argument("addr", help="address(es)", type=int, nargs="+")
67 sub_parsers.add_parser("ls", help="list inputs")
68 return parser
71def run(args):
72 if args.command in {"listen", "dump"}:
73 listen(args)
74 elif args.command in {"alisten", "adump"}:
75 asyncio.run(async_listen(args))
76 elif args.command == "ls":
77 ls(args)
80def main(args=None):
81 parser = cli()
82 args = parser.parse_args(args=args)
83 try:
84 run(args)
85 except KeyboardInterrupt:
86 print("\rCtrl-C pressed. Bailing out")
89if __name__ == "__main__":
90 main()