Coverage for linuxpy/midi/device.py: 34%

503 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-18 17:55 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7""" 

8Human friendly interface to linux MIDI subsystem. 

9 

10The heart of linuxpy MIDI library is the [`Sequencer`][linuxpy.midi.device.Sequencer] 

11class. 

12Usually you need only one instance of Sequencer for your application. 

13The recommended way is to use it within a context manager like: 

14 

15```python 

16with Sequencer("My MIDI App") as midi: 

17 print(f"MIDI version: {midi.version}") 

18``` 

19 

20which is roughly equivalent to: 

21 

22```python 

23midi = Sequencer("My MIDI App") 

24midi.open() 

25try: 

26 print(f"MIDI version: {midi.version}") 

27finally: 

28 midi.close() 

29``` 

30 

31Here's a real world example: 

32 

33```python 

34from linuxpy.midi.device import Sequencer 

35 

36with Sequencer("My MIDI App") as midi: 

37 print(f"I'm client {midi.client_id}") 

38 print(f"MIDI version: {midi.version}") 

39 port = midi.create_port() 

40 port.connect_from((0, 1)) 

41 for event in midi: 

42 print(event) 

43``` 

44""" 

45 

46import asyncio 

47import contextlib 

48import errno 

49import itertools 

50from enum import IntEnum 

51 

52from linuxpy import ctypes 

53from linuxpy.ctypes import Struct, cint, cvoidp, sizeof 

54from linuxpy.device import DEV_PATH, BaseDevice 

55from linuxpy.ioctl import ioctl 

56from linuxpy.midi.raw import ( 

57 IOC, 

58 ClientType, 

59 EventLength, 

60 EventType, 

61 PortCapability, 

62 PortType, 

63 TimeMode, 

64 TimeStamp, 

65 snd_seq_addr, 

66 snd_seq_client_info, 

67 snd_seq_event, 

68 snd_seq_port_info, 

69 snd_seq_port_subscribe, 

70 snd_seq_query_subs, 

71 snd_seq_queue_client, 

72 snd_seq_queue_info, 

73 snd_seq_queue_status, 

74 snd_seq_running_info, 

75 snd_seq_system_info, 

76) 

77from linuxpy.types import AsyncIterable, Iterable, Optional, Sequence, Union 

78from linuxpy.util import Version, add_reader_asyncio 

79 

80ALSA_PATH = DEV_PATH / "snd" 

81SEQUENCER_PATH = ALSA_PATH / "seq" 

82 

83 

84class RealtimeStatusCode(IntEnum): 

85 REALTIME = 0xF0 

86 SYSEX_START = 0xF0 

87 MIDI_TIME_CODE = 0xF1 

88 SONG_POSITION = 0xF2 

89 SONG_SELECT = 0xF3 

90 TUNE_REQUEST = 0xF6 

91 SYSEX_END = 0xF7 

92 TIMING_CLOCK = 0xF8 

93 START = 0xFA 

94 CONTINUE = 0xFB 

95 STOP = 0xFC 

96 ACTIVE_SENSING = 0xFE 

97 RESET = 0xFF 

98 

99 

100#: unknown source 

101ADDRESS_UNKNOWN = 253 

102 

103#: direct dispatch 

104QUEUE_DIRECT = 253 

105 

106#: send event to all subscribed ports 

107ADDRESS_SUBSCRIBERS = 254 

108 

109#: send event to all queues/clients/ports/channels 

110ADDRESS_BROADCAST = 255 

111 

112SUBSCRIBERS = (ADDRESS_SUBSCRIBERS, ADDRESS_UNKNOWN) 

113BROADCAST = (ADDRESS_BROADCAST, ADDRESS_BROADCAST) 

114 

115#: size of event (bytes) 

116EVENT_SIZE = sizeof(snd_seq_event) 

117 

118#: READ + SUBSCRIBE READ port capabilities 

119INPUT = PortCapability.READ | PortCapability.SUBS_READ 

120 

121#: WRITE + SUBSCRIBE WRITE port capabilities 

122OUTPUT = PortCapability.WRITE | PortCapability.SUBS_WRITE 

123 

124#: full READ + WRITE port capabilities 

125INPUT_OUTPUT = INPUT | OUTPUT 

126 

127 

128PortAddress = Union[int, "Port"] 

129FullPortAddress = Union[snd_seq_addr, tuple[int, PortAddress]] 

130EventT = Union[str, int, EventType] 

131FullPortAddresses = Sequence[FullPortAddress] 

132 

133 

134class MidiError(Exception): 

135 """MIDI error""" 

136 

137 

138def read_pversion(seq) -> int: 

139 return ioctl(seq, IOC.PVERSION, cint()).value 

140 

141 

142def read_system_info(seq) -> snd_seq_system_info: 

143 return ioctl(seq, IOC.SYSTEM_INFO, snd_seq_system_info()) 

144 

145 

146def read_running_mode(seq) -> snd_seq_running_info: 

147 return ioctl(seq, IOC.RUNNING_MODE, snd_seq_running_info()) 

148 

149 

150def read_client_id(seq) -> int: 

151 return ioctl(seq, IOC.CLIENT_ID, cint()).value 

152 

153 

154def read_client_info(seq, client_id: int) -> snd_seq_client_info: 

155 return ioctl(seq, IOC.GET_CLIENT_INFO, snd_seq_client_info(client=client_id)) 

156 

157 

158def write_client_info(seq, client: snd_seq_client_info): 

159 return ioctl(seq, IOC.SET_CLIENT_INFO, client) 

160 

161 

162def next_client(seq, client_id: int) -> Optional[snd_seq_client_info]: 

163 with contextlib.suppress(FileNotFoundError): 

164 return ioctl(seq, IOC.QUERY_NEXT_CLIENT, snd_seq_client_info(client=client_id)) 

165 

166 

167def iter_read_clients(seq) -> Iterable[snd_seq_client_info]: 

168 next_client_id = -1 

169 while client := next_client(seq, next_client_id): 

170 yield client 

171 next_client_id = client.client 

172 

173 

174def read_port_info(seq, client_id: int, port_id: int) -> snd_seq_port_info: 

175 port = snd_seq_port_info(client=client_id) 

176 port.addr.client = client_id 

177 port.addr.port = port_id 

178 return ioctl(seq, IOC.GET_PORT_INFO, port) 

179 

180 

181def next_port(seq, client_id: int, port_id: int) -> Optional[snd_seq_port_info]: 

182 port = snd_seq_port_info(client=client_id) 

183 port.addr.client = client_id 

184 port.addr.port = port_id 

185 with contextlib.suppress(FileNotFoundError): 

186 return ioctl(seq, IOC.QUERY_NEXT_PORT, port) 

187 

188 

189def iter_read_ports(seq, client_id: int) -> Iterable[snd_seq_port_info]: 

190 next_port_id = -1 

191 while port := next_port(seq, client_id, next_port_id): 

192 yield port 

193 next_port_id = port.addr.port 

194 

195 

196def create_port(seq, port: snd_seq_port_info): 

197 return ioctl(seq, IOC.CREATE_PORT, port) 

198 

199 

200def delete_port(seq, port: snd_seq_port_info): 

201 return ioctl(seq, IOC.DELETE_PORT, port) 

202 

203 

204def subscribe(seq, src: FullPortAddress, dest: FullPortAddress): 

205 subs = snd_seq_port_subscribe() 

206 subs.sender = to_address(src) 

207 subs.dest = to_address(dest) 

208 return ioctl(seq, IOC.SUBSCRIBE_PORT, subs) 

209 

210 

211def unsubscribe(seq, src: FullPortAddress, dest: FullPortAddress): 

212 subs = snd_seq_port_subscribe() 

213 subs.sender = to_address(src) 

214 subs.dest = to_address(dest) 

215 return ioctl(seq, IOC.UNSUBSCRIBE_PORT, subs) 

216 

217 

218def iter_read_subscribers(seq, client_id: int, port_id: int): 

219 for i in itertools.count(): 

220 value = snd_seq_query_subs() 

221 value.root.client = client_id 

222 value.root.port = port_id 

223 value.index = i 

224 ioctl(seq, IOC.QUERY_SUBS, value) 

225 if value.index == value.num_subs: 

226 break 

227 yield value 

228 if value.index + 1 == value.num_subs: 

229 break 

230 

231 

232def create_queue(seq) -> snd_seq_queue_info: 

233 return ioctl(seq, IOC.CREATE_QUEUE, snd_seq_queue_info()) 

234 

235 

236def delete_queue(seq, queue: int) -> snd_seq_queue_info: 

237 return ioctl(seq, IOC.DELETE_QUEUE, snd_seq_queue_info(queue=queue)) 

238 

239 

240def write_queue_info(seq, queue: snd_seq_queue_info): 

241 return ioctl(seq, IOC.SET_QUEUE_INFO, queue) 

242 

243 

244def read_queue_status(seq, queue: int) -> snd_seq_queue_status: 

245 return ioctl(seq, IOC.GET_QUEUE_STATUS, snd_seq_queue_status(queue=queue)) 

246 

247 

248def read_queue_client(seq, queue: int) -> snd_seq_queue_client: 

249 return ioctl(seq, IOC.GET_QUEUE_CLIENT, snd_seq_queue_client(queue=queue)) 

250 

251 

252def to_address(addr: FullPortAddress) -> snd_seq_addr: 

253 """Convert to low level snd_seq_addr""" 

254 if isinstance(addr, snd_seq_addr): 

255 return addr 

256 return snd_seq_addr(*map(int, addr)) 

257 

258 

259def to_event_type(event_type: EventT) -> EventType: 

260 if isinstance(event_type, EventType): 

261 return event_type 

262 elif isinstance(event_type, int): 

263 return EventType(event_type) 

264 else: 

265 try: 

266 event_type = event_type.upper().replace(" ", "_") 

267 return EventType[event_type] 

268 except KeyError: 

269 event_type = event_type.replace("_", "").upper() 

270 return EventType[event_type] 

271 

272 

273class Sequencer(BaseDevice): 

274 """ 

275 Central MIDI class. 

276 

277 ```python 

278 from linuxpy.midi.device import Sequencer 

279 

280 with Sequencer("My MIDI App") as midi: 

281 print(f"I'm client {midi.client_id}") 

282 print(f"MIDI version: {midi.version}") 

283 port = midi.create_port() 

284 port.connect_from((0, 1)) 

285 for event in midi: 

286 print(event) 

287 ``` 

288 """ 

289 

290 def __init__(self, name: str = "linuxpy client", **kwargs): 

291 self.name = name 

292 self.client_id = -1 

293 self.version: Optional[Version] = None 

294 self._local_ports = set() 

295 self.subscriptions = set() 

296 super().__init__(SEQUENCER_PATH, **kwargs) 

297 

298 def __iter__(self) -> Iterable["Event"]: 

299 """ 

300 Build an infinite iterator that streams MIDI events from the 

301 subscribed ports. 

302 You'll need an open sequencer before using it: 

303 

304 ```python 

305 from linuxpy.midi.device import Sequencer 

306 

307 with Sequencer() as midi: 

308 port = midi.create_port() 

309 port.connect_from((0, 1)) 

310 for event in midi: 

311 print(event) 

312 ``` 

313 """ 

314 return event_stream(self) 

315 

316 async def __aiter__(self) -> AsyncIterable["Event"]: 

317 """ 

318 Build an infinite async iterator that streams MIDI events from the 

319 subscribed ports. 

320 You'll need an open sequencer before using it: 

321 

322 ```python 

323 import asyncio 

324 from linuxpy.midi.device import Sequencer 

325 

326 async def main(): 

327 with Sequencer() as midi: 

328 port = midi.create_port() 

329 port.connect_from((0, 1)) 

330 async for event in midi: 

331 print(event) 

332 

333 asyncio.run(main()) 

334 ``` 

335 """ 

336 async for event in async_event_stream(self, maxsize=10): 

337 yield event 

338 

339 def _on_open(self): 

340 self.client_id = read_client_id(self) 

341 client_info = read_client_info(self, self.client_id) 

342 self.version = Version.from_number(read_pversion(self)) 

343 client_info.name = self.name.encode() 

344 write_client_info(self, client_info) 

345 

346 def _on_close(self): 

347 # TODO: delete all open ports 

348 for port in set(self._local_ports): 

349 self.delete_port(port) 

350 

351 @property 

352 def client_info(self) -> snd_seq_client_info: 

353 """Current Client information""" 

354 return read_client_info(self, self.client_id) 

355 

356 @property 

357 def client(self) -> "Client": 

358 """Current Client information""" 

359 return self.get_client(self.client_id) 

360 

361 @property 

362 def running_mode(self) -> snd_seq_running_info: 

363 """Current running mode""" 

364 return read_running_mode(self) 

365 

366 @property 

367 def system_info(self) -> snd_seq_system_info: 

368 """Current system information""" 

369 return read_system_info(self) 

370 

371 def get_client(self, client_id: int) -> "Client": 

372 """ 

373 Returns a Client for the given ID or raises an error if the client 

374 doesn't exist. 

375 It returns new Client object each time 

376 """ 

377 info = read_client_info(self, client_id) 

378 return Client(self, info) 

379 

380 @property 

381 def iter_clients(self) -> Iterable["Client"]: 

382 """An iterator over all open clients on the system. It returns new Client each time""" 

383 return (Client(self, client_info) for client_info in iter_read_clients(self)) 

384 

385 @property 

386 def clients(self) -> Sequence["Client"]: 

387 """Returns a new list of all clients on the system""" 

388 return list(self.iter_clients) 

389 

390 @property 

391 def iter_ports(self) -> Iterable["Port"]: 

392 """ 

393 An iterator over all open ports on the system. 

394 It returns new Port objects each time 

395 """ 

396 for client in self.iter_clients: 

397 for port in iter_read_ports(self, client.client_id): 

398 yield Port(self, port) 

399 

400 @property 

401 def ports(self) -> Sequence["Port"]: 

402 """ 

403 Returns a new list of all open ports on the system. 

404 It returns new Port objects each time 

405 """ 

406 return list(self.iter_ports) 

407 

408 def get_port(self, address: FullPortAddress) -> "Port": 

409 """ 

410 Returns a Port for the given address or raises an error if the port 

411 doesn't exist. 

412 It returns new Port object each time 

413 """ 

414 port_address = to_address(address) 

415 info = read_port_info(self, port_address.client, port_address.port) 

416 return Port(self, info) 

417 

418 def create_port( 

419 self, 

420 name: str = "linuxpy port", 

421 capabilities: PortCapability = INPUT_OUTPUT, 

422 port_type: PortType = PortType.MIDI_GENERIC | PortType.APPLICATION, 

423 ) -> "Port": 

424 """ 

425 Create a new local port. By default it will create a MIDI generic 

426 application Input/Output port. 

427 """ 

428 port_info = snd_seq_port_info() 

429 port_info.name = name.encode() 

430 port_info.addr.client = self.client_id 

431 port_info.capability = capabilities 

432 port_info.type = port_type 

433 port_info.midi_channels = 16 

434 port_info.midi_voices = 64 

435 port_info.synth_voices = 0 

436 create_port(self, port_info) 

437 port = Port(self, port_info) 

438 self._local_ports.add(port_info.addr.port) 

439 return port 

440 

441 def delete_port(self, port: Union[int, "Port"]): 

442 """ 

443 Delete a previously created local port. If the port has any 

444 subscriptions they will be closed before the port is deleted 

445 """ 

446 if isinstance(port, int): 

447 addr = self.client_id, port 

448 port_info = snd_seq_port_info(addr=snd_seq_addr(client=self.client_id, port=port)) 

449 else: 

450 port_info = port.info 

451 if port_info.addr.client != self.client_id: 

452 raise MidiError("Cannot delete non local port") 

453 addr = port_info.addr.client, port_info.addr.port 

454 # unsubscribe first 

455 for uid in set(self.subscriptions): 

456 if addr in {uid[0:2], uid[2:4]}: 

457 self.unsubscribe(uid[0:2], uid[2:4]) 

458 self._local_ports.remove(addr[1]) 

459 delete_port(self, port_info) 

460 

461 def subscribe(self, src: FullPortAddress, dest: FullPortAddress): 

462 """ 

463 Subscribe a source port to a destination port 

464 """ 

465 src = to_address(src) 

466 dest = to_address(dest) 

467 uid = (src.client, src.port, dest.client, dest.port) 

468 self.subscriptions.add(uid) 

469 subscribe(self, src, dest) 

470 

471 def unsubscribe(self, src: FullPortAddress, dest: FullPortAddress): 

472 """ 

473 Unsubscribe a previously subscribed source port to a destination port 

474 """ 

475 src = to_address(src) 

476 dest = to_address(dest) 

477 uid = (src.client, src.port, dest.client, dest.port) 

478 self.subscriptions.remove(uid) 

479 try: 

480 unsubscribe(self, src, dest) 

481 except OSError as error: 

482 if error.errno == errno.ENXIO: 

483 self.log.info("Could not delete port (maybe device was unplugged)") 

484 else: 

485 raise 

486 

487 def iter_raw_read(self, max_nb_packets: int = 64) -> Iterable["Event"]: 

488 """ 

489 Read list of pending events. If the sequencer is opened in blocking 

490 mode and there are no events it blocks until at least one event occurs 

491 otherwise as OSError is raised. 

492 

493 Use the `read()` call instead because it handles blocking vs 

494 non-blocking variants transperently. 

495 """ 

496 payload = self._fobj.read(max_nb_packets * EVENT_SIZE) 

497 nb_packets = len(payload) // EVENT_SIZE 

498 i = 0 

499 while i < nb_packets: 

500 start = i * EVENT_SIZE 

501 packet = payload[start : start + EVENT_SIZE] 

502 event = Event(snd_seq_event.from_buffer_copy(packet)) 

503 i += 1 

504 if event.is_variable_length_type: 

505 size = event.event.data.ext.len 

506 nb = (size + EVENT_SIZE - 1) // EVENT_SIZE 

507 event.data = payload[i * EVENT_SIZE : i * EVENT_SIZE + size][1:-1] 

508 i += nb 

509 yield event 

510 

511 def raw_read(self, max_nb_packets=64) -> Sequence["Event"]: 

512 """ 

513 Read list of pending events. If there are no events it blocks until at 

514 least one event occurs and returns it. 

515 

516 Use the `read()` call instead because it handles blocking vs 

517 non-blocking variants transperently. 

518 """ 

519 return tuple(self.iter_raw_read(max_nb_packets=max_nb_packets)) 

520 

521 def wait_read(self) -> Sequence["Event"]: 

522 """ 

523 Read list of pending events. If there are no events it blocks until at 

524 least one event occurs and returns it. 

525 This method assumes the internal file descriptior was opened in 

526 non-blocking mode. 

527 

528 Use the `read()` call instead because it handles blocking vs 

529 non-blocking variants transperently 

530 """ 

531 if self.io.select is not None: 

532 self.io.select((self,), (), ()) 

533 return self.raw_read() 

534 

535 def read(self) -> Sequence["Event"]: 

536 """ 

537 Read list of pending events. If there are no events it blocks until at 

538 least one event occurs and returns it 

539 """ 

540 # first time we check what mode device was opened (blocking vs non-blocking) 

541 if self.is_blocking: 

542 self.read = self.raw_read 

543 else: 

544 self.read = self.wait_read 

545 return self.read() 

546 

547 def write(self, event: "Event"): 

548 """Send an event message""" 

549 self._fobj.write(bytes(event)) 

550 

551 def send( 

552 self, 

553 port: PortAddress, 

554 event_type: Union[str, int, EventType], 

555 queue: int = QUEUE_DIRECT, 

556 to: Union[FullPortAddress, FullPortAddresses] = SUBSCRIBERS, 

557 **kwargs, 

558 ): 

559 """ 

560 Send a message of the given type from a specific port to the destination 

561 address(es). Use kwargs to pass specific event arguments like velocity in 

562 a "note on" event. 

563 

564 event_type can be an instance of EventType or the equivalent number or 

565 a case insensitive string matching the event type (ex: "noteon", "NOTEON", 

566 "note-on" or "note on"). 

567 

568 The following example sends "note on" with velocity 45 on port 0 of client 14: 

569 

570 ```python 

571 midi.send((14, 0), "note on", velocity=45) 

572 ``` 

573 """ 

574 event = Event.new(event_type, **kwargs) 

575 event.queue = queue 

576 event.source = to_address((self.client_id, port)) 

577 if isinstance(to, Sequence) and isinstance(to[0], Sequence): 

578 to_list = to 

579 else: 

580 to_list = (to,) 

581 for dest in to_list: 

582 event.dest = to_address(dest) 

583 self.write(event) 

584 

585 

586class Client: 

587 """ 

588 MIDI sequencer client. Don't instantiate this object directly 

589 Use instead `Sequencer.get_client()` 

590 """ 

591 

592 def __init__(self, sequencer: Sequencer, client: snd_seq_client_info): 

593 self.sequencer = sequencer 

594 self.info = client 

595 

596 def __int__(self): 

597 "The client ID" 

598 return self.client_id 

599 

600 @property 

601 def name(self) -> str: 

602 "Client name" 

603 return self.info.name.decode() 

604 

605 @property 

606 def client_id(self): 

607 return self.info.client 

608 

609 @property 

610 def type(self): 

611 return ClientType(self.info.type) 

612 

613 @property 

614 def is_local(self) -> bool: 

615 """ 

616 True if the client was created by the MIDI sequencer that it 

617 references or False otherwise" 

618 """ 

619 return self.sequencer.client_id == self.info.client 

620 

621 @property 

622 def iter_ports(self) -> Iterable["Port"]: 

623 """An iterator over all open ports for this client. It returns new Port each time""" 

624 for port in iter_read_ports(self.sequencer, self.client_id): 

625 yield Port(self.sequencer, port) 

626 

627 @property 

628 def ports(self) -> Sequence["Port"]: 

629 """Returns a new list of all open ports for this client""" 

630 return list(self.iter_ports) 

631 

632 def get_port(self, port_id: int) -> "Port": 

633 return self.sequencer.get_port((self.client_id, port_id)) 

634 

635 def refresh(self): 

636 self.info = read_client_info(self.sequencer, self.client_id) 

637 

638 

639class Port: 

640 """ 

641 MIDI sequencer port. Don't instantiate this object directly 

642 Use instead `Sequencer.get_port()` 

643 """ 

644 

645 def __init__(self, sequencer: Sequencer, port: snd_seq_port_info): 

646 self.sequencer = sequencer 

647 self.info = port 

648 

649 def __repr__(self): 

650 prefix = self.is_local and "Local" or "" 

651 name = f"{prefix}{type(self).__name__}" 

652 client = self.client_id 

653 port = self.port_id 

654 ptype = str(self.type).split(".", 1)[-1] 

655 caps = str(self.capability).split(".", 1)[-1] 

656 return f"<{name} {client=}, {port=}, type={ptype}, {caps=}>" 

657 

658 def __str__(self): 

659 ptype = str(self.type).split(".", 1)[-1] 

660 caps = str(self.capability).split(".", 1)[-1] 

661 return f"{self.client_id:3}:{self.port_id:<3} {self.name} {ptype} {caps}" 

662 

663 def __int__(self): 

664 "The port ID" 

665 return self.port_id 

666 

667 @property 

668 def name(self) -> str: 

669 "Port name" 

670 return self.info.name.decode() 

671 

672 @property 

673 def is_local(self) -> bool: 

674 """ 

675 True if the port was created by the MIDI sequencer that it 

676 references or False otherwise" 

677 """ 

678 return self.sequencer.client_id == self.info.addr.client 

679 

680 @property 

681 def client_id(self) -> int: 

682 """The client ID""" 

683 return self.info.addr.client 

684 

685 @property 

686 def port_id(self) -> int: 

687 """The port ID""" 

688 return self.info.addr.port 

689 

690 @property 

691 def type(self) -> PortType: 

692 """The port type""" 

693 return PortType(self.info.type) 

694 

695 @property 

696 def capability(self) -> PortCapability: 

697 """The port capabilities""" 

698 return PortCapability(self.info.capability) 

699 

700 @property 

701 def address(self) -> snd_seq_addr: 

702 """The port address""" 

703 return self.info.addr 

704 

705 # MIDI In 

706 def connect_from(self, src: FullPortAddress): 

707 """ 

708 Connect this port to a remote port. After connecting, this port will 

709 receive events originating from the source port. 

710 

711 Example: 

712 

713 ```python 

714 from linuxpy.midi.device import Sequencer 

715 

716 with Sequencer() as midi: 

717 port = midi.create_port() 

718 port.connect_from((0, 1)) 

719 for event in midi: 

720 print(event) 

721 ``` 

722 """ 

723 if not self.is_local: 

724 raise MidiError("Can only connect local port") 

725 self.sequencer.subscribe(src, self.address) 

726 

727 def disconnect_from(self, src: FullPortAddress): 

728 """ 

729 Disconnect this port from a previously connected source port. 

730 """ 

731 if not self.is_local: 

732 raise MidiError("Can only disconnect local port") 

733 self.sequencer.unsubscribe(src, self.address) 

734 

735 # MIDI Out 

736 def connect_to(self, dest: FullPortAddress): 

737 """ 

738 Connect this port to a remote port. After connecting, events 

739 originating from this port will be sent to the destination port. 

740 

741 Example: 

742 

743 ```python 

744 from linuxpy.midi.device import Sequencer 

745 

746 with Sequencer() as midi: 

747 port = midi.create_port() 

748 # Assume 14:0 is Midi Through 

749 port.connect_to((14, 0)) 

750 port.send("note on", note=11, velocity=10) 

751 ``` 

752 

753 """ 

754 if not self.is_local: 

755 raise MidiError("Can only connect local port") 

756 self.sequencer.subscribe(self.address, dest) 

757 

758 def disconnect_to(self, dest: FullPortAddress): 

759 """ 

760 Disconnect this port from a previously connected destination port. 

761 """ 

762 if not self.is_local: 

763 raise MidiError("Can only disconnect local port") 

764 self.sequencer.unsubscribe(self.address, dest) 

765 

766 def delete(self): 

767 """ 

768 Delete this port. Raises MidiError if port is not local. 

769 Any subscriptions are canceled before the port is deleted. 

770 """ 

771 if not self.is_local: 

772 raise MidiError("Can only delete local port") 

773 self.sequencer.delete_port(self) 

774 

775 def send(self, event_type: Union[str, int, EventType], **kwargs): 

776 """ 

777 Send a message of the given type from to the destination address(es). 

778 Use kwargs to pass specific event arguments like velocity in a 

779 "note on" event. 

780 

781 event_type can be an instance of EventType or the equivalent number or 

782 a case insensitive string matching the event type (ex: "noteon", "NOTEON", 

783 "note-on" or "note on"). 

784 

785 The following example sends "note on" on note 42, with velocity 45: 

786 

787 ```python 

788 port.send("note on", note=42, velocity=45) 

789 ``` 

790 """ 

791 self.sequencer.send(self, event_type, **kwargs) 

792 

793 

794EVENT_TYPE_INFO = { 

795 EventType.SYSTEM: ("System", "result"), 

796 EventType.RESULT: ("Result", "result"), 

797 EventType.NOTE: ("Note", "note"), 

798 EventType.NOTEON: ("Note on", "note"), 

799 EventType.NOTEOFF: ("Note off", "note"), 

800 EventType.KEYPRESS: ("Polyphonic aftertouch", "note"), 

801 EventType.CONTROLLER: ("Control change", "control"), 

802 EventType.PGMCHANGE: ("Program change", "control"), 

803 EventType.CHANPRESS: ("Channel aftertouch", "control"), 

804 EventType.PITCHBEND: ("Pitch bend", "control"), 

805 EventType.CONTROL14: ("Control change", "control"), 

806 EventType.NONREGPARAM: ("Non-reg. param.", "control"), 

807 EventType.REGPARAM: ("Reg param.", "control"), 

808 EventType.SONGPOS: ("Song position ptr", "control"), 

809 EventType.SONGSEL: ("Song select", "control"), 

810 EventType.QFRAME: ("MTC Quarter frame", "control"), 

811 EventType.TIMESIGN: ("SMF time signature", "control"), 

812 EventType.KEYSIGN: ("SMF key signature", "control"), 

813 EventType.START: ("Start", "queue"), 

814 EventType.CONTINUE: ("Continue", "queue"), 

815 EventType.STOP: ("Stop", "queue"), 

816 EventType.SETPOS_TICK: ("Set tick queue pos.", "queue"), 

817 EventType.SETPOS_TIME: ("Set rt queue pos.", "queue"), 

818 EventType.TEMPO: ("Set queue tempo", "queue"), 

819 EventType.CLOCK: ("Clock", "queue"), 

820 EventType.TICK: ("Tick", "queue"), 

821 EventType.QUEUE_SKEW: ("Queue timer skew", "queue"), 

822 EventType.TUNE_REQUEST: ("Tune request", None), 

823 EventType.RESET: ("Reset", None), 

824 EventType.SENSING: ("Active sensing", None), 

825 EventType.CLIENT_START: ("Client start", "addr"), 

826 EventType.CLIENT_EXIT: ("Client exit", "addr"), 

827 EventType.CLIENT_CHANGE: ("Client change", "addr"), 

828 EventType.PORT_START: ("Port start", "addr"), 

829 EventType.PORT_EXIT: ("Port exit", "addr"), 

830 EventType.PORT_CHANGE: ("Port change", "addr"), 

831 EventType.PORT_SUBSCRIBED: ("Port subscribed", "connect"), 

832 EventType.PORT_UNSUBSCRIBED: ("Port unsubscribed", "connect"), 

833 EventType.SYSEX: ("System exclusive", "ext"), 

834} 

835 

836 

837def struct_text(obj): 

838 fields = [] 

839 for field_name, _, value in obj: 

840 if isinstance(value, Struct): 

841 value = f"({struct_text(value)})" 

842 elif isinstance(value, ctypes.Union): 

843 continue 

844 else: 

845 value = str(value) 

846 fields.append(f"{field_name}={value}") 

847 return ", ".join(fields) 

848 

849 

850class Event: 

851 """Event message object result of listening on a sequencer""" 

852 

853 SIXEX_START = RealtimeStatusCode.SYSEX_START.to_bytes(1, "big") 

854 SIXEX_END = RealtimeStatusCode.SYSEX_END.to_bytes(1, "big") 

855 

856 def __init__(self, event: snd_seq_event): 

857 self.event = event 

858 self.data = b"" 

859 

860 def __repr__(self): 

861 cname = type(self).__name__ 

862 return f"<{cname} type={self.type.name}>" 

863 

864 def __str__(self): 

865 data = EVENT_TYPE_INFO.get(self.type) 

866 if data is None: 

867 return self.type.name 

868 name, member_name = data 

869 addr = f"{self.source_client_id:}:{self.source_port_id:} {self.dest_client_id:}:{self.dest_port_id:}" 

870 result = f"{addr} {name} " 

871 if self.type == EventType.SYSEX: 

872 result += " ".join(f"{i:02X}" for i in self.raw_data) 

873 elif self.type == EventType.CLOCK: 

874 queue_ctrl = self.queue_ctrl 

875 real_time = queue_ctrl.param.time.time 

876 timestamp = real_time.tv_sec + real_time.tv_nsec * 1e-9 

877 result += f"queue={queue_ctrl.queue} {timestamp=}" 

878 elif member_name: 

879 member = getattr(self.event.data, member_name) 

880 result += struct_text(member) 

881 return result 

882 

883 def __bytes__(self): 

884 """Serialize the Event in a bytes ready to be sent""" 

885 if self.type == EventType.SYSEX: 

886 self.event.flags = EventLength.VARIABLE 

887 if self.is_variable_length_type: 

888 data = self.raw_data 

889 self.event.data.ext.len = len(data) 

890 self.event.data.ext.ptr = cvoidp() 

891 return bytes(self.event) + data 

892 return bytes(self.event) 

893 

894 @classmethod 

895 def new(cls, etype: EventT, **kwargs): 

896 """Create new Event of the given type""" 

897 data = kwargs.pop("data", b"") 

898 event = snd_seq_event() 

899 etype = to_event_type(etype) 

900 event.type = etype 

901 event_info = EVENT_TYPE_INFO.get(etype) 

902 if event_info: 

903 _, member_name = event_info 

904 member = getattr(event.data, member_name) 

905 allowed = {name for name, _ in member._fields_} 

906 not_allowed = set(kwargs) - allowed 

907 if not_allowed: 

908 raise ValueError(f"These fields are not allowed for {etype.name}: {', '.join(not_allowed)}") 

909 for key, value in kwargs.items(): 

910 setattr(member, key, value) 

911 result = cls(event) 

912 result.data = data 

913 return result 

914 

915 @property 

916 def raw_data(self) -> bytes: 

917 if self.type == EventType.SYSEX and not self.data.startswith(self.SIXEX_START): 

918 return self.SIXEX_START + self.data + self.SIXEX_END 

919 return self.data 

920 

921 @property 

922 def type(self) -> EventType: 

923 return EventType(self.event.type) 

924 

925 @property 

926 def length_type(self) -> EventLength: 

927 return EventLength(self.event.flags & EventLength.MASK.value) 

928 

929 @property 

930 def is_variable_length_type(self) -> bool: 

931 return self.length_type == EventLength.VARIABLE 

932 

933 @property 

934 def timestamp_type(self) -> TimeStamp: 

935 return TimeStamp(self.event.flags & TimeStamp.MASK.value) 

936 

937 @property 

938 def time_mode(self) -> TimeMode: 

939 return TimeMode(self.event.flags & TimeMode.MASK.value) 

940 

941 @property 

942 def timestamp(self) -> Union[float, int]: 

943 if self.timestamp_type == TimeStamp.REAL: 

944 return self.event.data.time.time.tv_sec + self.event.data.time.time.tv_nsec * 1e-9 

945 return self.event.data.time.tick 

946 

947 @property 

948 def note(self): 

949 return self.event.data.note 

950 

951 @property 

952 def control(self): 

953 return self.event.data.control 

954 

955 @property 

956 def connect(self): 

957 return self.event.data.connect 

958 

959 @property 

960 def queue_ctrl(self): 

961 return self.event.data.queue 

962 

963 @property 

964 def source(self) -> snd_seq_addr: 

965 return self.event.source 

966 

967 @source.setter 

968 def source(self, address: FullPortAddress): 

969 self.event.source = to_address(address) 

970 

971 @property 

972 def dest(self) -> snd_seq_addr: 

973 return self.event.dest 

974 

975 @dest.setter 

976 def dest(self, address: FullPortAddress): 

977 self.event.dest = to_address(address) 

978 

979 @property 

980 def source_client_id(self) -> int: 

981 return self.event.source.client 

982 

983 @property 

984 def source_port_id(self) -> int: 

985 return self.event.source.port 

986 

987 @property 

988 def dest_client_id(self) -> int: 

989 return self.event.dest.client 

990 

991 @property 

992 def dest_port_id(self) -> int: 

993 return self.event.dest.port 

994 

995 @property 

996 def queue(self): 

997 return self.event.queue 

998 

999 @queue.setter 

1000 def queue(self, queue: int): 

1001 self.event.queue = queue 

1002 

1003 client_id = source_client_id 

1004 port_id = source_port_id 

1005 

1006 

1007def event_stream(sequencer: Sequencer) -> Iterable[Event]: 

1008 """Infinite stream of events coming from the given sequencer""" 

1009 while True: 

1010 yield from sequencer.read() 

1011 

1012 

1013async def async_event_stream(sequencer: Sequencer, maxsize: int = 10) -> AsyncIterable[Event]: 

1014 """Infinite async stream of events coming from the given sequencer""" 

1015 queue = asyncio.Queue(maxsize=maxsize) 

1016 

1017 def feed(): 

1018 for event in sequencer.read(): 

1019 queue.put_nowait(event) 

1020 

1021 with add_reader_asyncio(sequencer.fileno(), feed): 

1022 while True: 

1023 yield await queue.get() 

1024 

1025 

1026if __name__ == "__main__": 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true

1027 seq = Sequencer() 

1028 seq.open()