Coverage for linuxpy/midi/cli.py: 0%

79 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2025-05-27 13:54 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7import argparse 

8import asyncio 

9 

10from linuxpy.midi.device import ( 

11 EVENT_TYPE_INFO, 

12 EventType, 

13 PortCapability, 

14 PortType, 

15 Sequencer, 

16 iter_read_clients, 

17 iter_read_ports, 

18 struct_text, 

19) 

20 

21 

22def event_text(event): 

23 data = EVENT_TYPE_INFO.get(event.type) 

24 if data is None: 

25 return event.type.name 

26 name, member_name = data 

27 result = f"{event.client_id:>3}:{event.port_id:<3} {name:<20} " 

28 if event.type == EventType.SYSEX: 

29 result += " ".join(f"{i:02X}" for i in event.raw_data) 

30 elif event.type == EventType.CLOCK: 

31 queue_ctrl = event.queue_ctrl 

32 real_time = queue_ctrl.param.time.time 

33 timestamp = real_time.tv_sec + real_time.tv_nsec * 1e-9 

34 result += f"queue={queue_ctrl.queue} {timestamp=}" 

35 elif member_name: 

36 member = getattr(event.event.data, member_name) 

37 result += struct_text(member) 

38 return result 

39 

40 

41def listen(seq, args): 

42 for addr in args.addr: 

43 port = seq.create_port(f"listen on {addr}") 

44 port.connect_from(addr) 

45 for event in seq: 

46 print(event_text(event)) 

47 

48 

49async def async_listen(seq, args): 

50 for addr in args.addr: 

51 port = seq.create_port(f"listen on {addr}") 

52 port.connect_from(addr) 

53 async for event in seq: 

54 print(event_text(event)) 

55 

56 

57def iter_all_ports(seq): 

58 for client in iter_read_clients(seq): 

59 yield from iter_read_ports(seq, client.client) 

60 

61 

62def ls(seq, _): 

63 print(f"{'Port':^7} {'Client':<24} {'Port':<24} {'Type':<30} {'Capabilities'}") 

64 for client in iter_read_clients(seq): 

65 cname = client.name.decode() 

66 for port in iter_read_ports(seq, client.client): 

67 pname = port.name.decode() 

68 capability = PortCapability(port.capability) 

69 caps = str(capability).split(".", 1)[-1] 

70 caps = caps.replace("SUBS_", "S").replace("READ", "R").replace("WRITE", "W").replace("|", ", ") 

71 ptype = str(PortType(port.type)).split(".", 1)[-1].replace("|", ", ") 

72 print(f"{port.addr.client:3}:{port.addr.port:<3} {cname:<24} {pname:<24} {ptype:<30} {caps}") 

73 

74 

75def cli(seq): 

76 ports = {(port.addr.client, port.addr.port) for port in iter_all_ports(seq)} 

77 

78 def address(text): 

79 client, port = text.split(":", 1) 

80 addr = int(client), int(port) 

81 if addr not in ports: 

82 raise ValueError(f"Port {text} not found") 

83 return addr 

84 

85 parser = argparse.ArgumentParser() 

86 sub_parsers = parser.add_subparsers( 

87 title="sub-commands", description="valid sub-commands", help="select one command", required=True, dest="command" 

88 ) 

89 listen = sub_parsers.add_parser("listen", aliases=["dump"], help="listen for events on selected port(s)") 

90 listen.add_argument("addr", help="address(es)", type=address, nargs="+") 

91 alisten = sub_parsers.add_parser("alisten", aliases=["adump"], help="listen for events on selected port(s)") 

92 alisten.add_argument("addr", help="address(es)", type=address, nargs="+") 

93 sub_parsers.add_parser("ls", help="list clients and ports") 

94 return parser 

95 

96 

97def run(seq, args): 

98 if args.command in {"listen", "dump"}: 

99 listen(seq, args) 

100 elif args.command in {"alisten", "adump"}: 

101 asyncio.run(async_listen(seq, args)) 

102 elif args.command == "ls": 

103 ls(seq, args) 

104 

105 

106def main(args=None): 

107 with Sequencer("linuxpy midi cli") as seq: 

108 parser = cli(seq) 

109 args = parser.parse_args(args=args) 

110 try: 

111 run(seq, args) 

112 except KeyboardInterrupt: 

113 print("\rCtrl-C pressed. Bailing out") 

114 

115 

116if __name__ == "__main__": 

117 main()