Coverage for linuxpy/midi/device.py: 34%
503 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-18 17:55 +0200
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-18 17:55 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""
8Human friendly interface to linux MIDI subsystem.
10The heart of linuxpy MIDI library is the [`Sequencer`][linuxpy.midi.device.Sequencer]
11class.
12Usually you need only one instance of Sequencer for your application.
13The recommended way is to use it within a context manager like:
15```python
16with Sequencer("My MIDI App") as midi:
17 print(f"MIDI version: {midi.version}")
18```
20which is roughly equivalent to:
22```python
23midi = Sequencer("My MIDI App")
24midi.open()
25try:
26 print(f"MIDI version: {midi.version}")
27finally:
28 midi.close()
29```
31Here's a real world example:
33```python
34from linuxpy.midi.device import Sequencer
36with Sequencer("My MIDI App") as midi:
37 print(f"I'm client {midi.client_id}")
38 print(f"MIDI version: {midi.version}")
39 port = midi.create_port()
40 port.connect_from((0, 1))
41 for event in midi:
42 print(event)
43```
44"""
46import asyncio
47import contextlib
48import errno
49import itertools
50from enum import IntEnum
52from linuxpy import ctypes
53from linuxpy.ctypes import Struct, cint, cvoidp, sizeof
54from linuxpy.device import DEV_PATH, BaseDevice
55from linuxpy.ioctl import ioctl
56from linuxpy.midi.raw import (
57 IOC,
58 ClientType,
59 EventLength,
60 EventType,
61 PortCapability,
62 PortType,
63 TimeMode,
64 TimeStamp,
65 snd_seq_addr,
66 snd_seq_client_info,
67 snd_seq_event,
68 snd_seq_port_info,
69 snd_seq_port_subscribe,
70 snd_seq_query_subs,
71 snd_seq_queue_client,
72 snd_seq_queue_info,
73 snd_seq_queue_status,
74 snd_seq_running_info,
75 snd_seq_system_info,
76)
77from linuxpy.types import AsyncIterable, Iterable, Optional, Sequence, Union
78from linuxpy.util import Version, add_reader_asyncio
80ALSA_PATH = DEV_PATH / "snd"
81SEQUENCER_PATH = ALSA_PATH / "seq"
84class RealtimeStatusCode(IntEnum):
85 REALTIME = 0xF0
86 SYSEX_START = 0xF0
87 MIDI_TIME_CODE = 0xF1
88 SONG_POSITION = 0xF2
89 SONG_SELECT = 0xF3
90 TUNE_REQUEST = 0xF6
91 SYSEX_END = 0xF7
92 TIMING_CLOCK = 0xF8
93 START = 0xFA
94 CONTINUE = 0xFB
95 STOP = 0xFC
96 ACTIVE_SENSING = 0xFE
97 RESET = 0xFF
100#: unknown source
101ADDRESS_UNKNOWN = 253
103#: direct dispatch
104QUEUE_DIRECT = 253
106#: send event to all subscribed ports
107ADDRESS_SUBSCRIBERS = 254
109#: send event to all queues/clients/ports/channels
110ADDRESS_BROADCAST = 255
112SUBSCRIBERS = (ADDRESS_SUBSCRIBERS, ADDRESS_UNKNOWN)
113BROADCAST = (ADDRESS_BROADCAST, ADDRESS_BROADCAST)
115#: size of event (bytes)
116EVENT_SIZE = sizeof(snd_seq_event)
118#: READ + SUBSCRIBE READ port capabilities
119INPUT = PortCapability.READ | PortCapability.SUBS_READ
121#: WRITE + SUBSCRIBE WRITE port capabilities
122OUTPUT = PortCapability.WRITE | PortCapability.SUBS_WRITE
124#: full READ + WRITE port capabilities
125INPUT_OUTPUT = INPUT | OUTPUT
128PortAddress = Union[int, "Port"]
129FullPortAddress = Union[snd_seq_addr, tuple[int, PortAddress]]
130EventT = Union[str, int, EventType]
131FullPortAddresses = Sequence[FullPortAddress]
134class MidiError(Exception):
135 """MIDI error"""
138def read_pversion(seq) -> int:
139 return ioctl(seq, IOC.PVERSION, cint()).value
142def read_system_info(seq) -> snd_seq_system_info:
143 return ioctl(seq, IOC.SYSTEM_INFO, snd_seq_system_info())
146def read_running_mode(seq) -> snd_seq_running_info:
147 return ioctl(seq, IOC.RUNNING_MODE, snd_seq_running_info())
150def read_client_id(seq) -> int:
151 return ioctl(seq, IOC.CLIENT_ID, cint()).value
154def read_client_info(seq, client_id: int) -> snd_seq_client_info:
155 return ioctl(seq, IOC.GET_CLIENT_INFO, snd_seq_client_info(client=client_id))
158def write_client_info(seq, client: snd_seq_client_info):
159 return ioctl(seq, IOC.SET_CLIENT_INFO, client)
162def next_client(seq, client_id: int) -> Optional[snd_seq_client_info]:
163 with contextlib.suppress(FileNotFoundError):
164 return ioctl(seq, IOC.QUERY_NEXT_CLIENT, snd_seq_client_info(client=client_id))
167def iter_read_clients(seq) -> Iterable[snd_seq_client_info]:
168 next_client_id = -1
169 while client := next_client(seq, next_client_id):
170 yield client
171 next_client_id = client.client
174def read_port_info(seq, client_id: int, port_id: int) -> snd_seq_port_info:
175 port = snd_seq_port_info(client=client_id)
176 port.addr.client = client_id
177 port.addr.port = port_id
178 return ioctl(seq, IOC.GET_PORT_INFO, port)
181def next_port(seq, client_id: int, port_id: int) -> Optional[snd_seq_port_info]:
182 port = snd_seq_port_info(client=client_id)
183 port.addr.client = client_id
184 port.addr.port = port_id
185 with contextlib.suppress(FileNotFoundError):
186 return ioctl(seq, IOC.QUERY_NEXT_PORT, port)
189def iter_read_ports(seq, client_id: int) -> Iterable[snd_seq_port_info]:
190 next_port_id = -1
191 while port := next_port(seq, client_id, next_port_id):
192 yield port
193 next_port_id = port.addr.port
196def create_port(seq, port: snd_seq_port_info):
197 return ioctl(seq, IOC.CREATE_PORT, port)
200def delete_port(seq, port: snd_seq_port_info):
201 return ioctl(seq, IOC.DELETE_PORT, port)
204def subscribe(seq, src: FullPortAddress, dest: FullPortAddress):
205 subs = snd_seq_port_subscribe()
206 subs.sender = to_address(src)
207 subs.dest = to_address(dest)
208 return ioctl(seq, IOC.SUBSCRIBE_PORT, subs)
211def unsubscribe(seq, src: FullPortAddress, dest: FullPortAddress):
212 subs = snd_seq_port_subscribe()
213 subs.sender = to_address(src)
214 subs.dest = to_address(dest)
215 return ioctl(seq, IOC.UNSUBSCRIBE_PORT, subs)
218def iter_read_subscribers(seq, client_id: int, port_id: int):
219 for i in itertools.count():
220 value = snd_seq_query_subs()
221 value.root.client = client_id
222 value.root.port = port_id
223 value.index = i
224 ioctl(seq, IOC.QUERY_SUBS, value)
225 if value.index == value.num_subs:
226 break
227 yield value
228 if value.index + 1 == value.num_subs:
229 break
232def create_queue(seq) -> snd_seq_queue_info:
233 return ioctl(seq, IOC.CREATE_QUEUE, snd_seq_queue_info())
236def delete_queue(seq, queue: int) -> snd_seq_queue_info:
237 return ioctl(seq, IOC.DELETE_QUEUE, snd_seq_queue_info(queue=queue))
240def write_queue_info(seq, queue: snd_seq_queue_info):
241 return ioctl(seq, IOC.SET_QUEUE_INFO, queue)
244def read_queue_status(seq, queue: int) -> snd_seq_queue_status:
245 return ioctl(seq, IOC.GET_QUEUE_STATUS, snd_seq_queue_status(queue=queue))
248def read_queue_client(seq, queue: int) -> snd_seq_queue_client:
249 return ioctl(seq, IOC.GET_QUEUE_CLIENT, snd_seq_queue_client(queue=queue))
252def to_address(addr: FullPortAddress) -> snd_seq_addr:
253 """Convert to low level snd_seq_addr"""
254 if isinstance(addr, snd_seq_addr):
255 return addr
256 return snd_seq_addr(*map(int, addr))
259def to_event_type(event_type: EventT) -> EventType:
260 if isinstance(event_type, EventType):
261 return event_type
262 elif isinstance(event_type, int):
263 return EventType(event_type)
264 else:
265 try:
266 event_type = event_type.upper().replace(" ", "_")
267 return EventType[event_type]
268 except KeyError:
269 event_type = event_type.replace("_", "").upper()
270 return EventType[event_type]
273class Sequencer(BaseDevice):
274 """
275 Central MIDI class.
277 ```python
278 from linuxpy.midi.device import Sequencer
280 with Sequencer("My MIDI App") as midi:
281 print(f"I'm client {midi.client_id}")
282 print(f"MIDI version: {midi.version}")
283 port = midi.create_port()
284 port.connect_from((0, 1))
285 for event in midi:
286 print(event)
287 ```
288 """
290 def __init__(self, name: str = "linuxpy client", **kwargs):
291 self.name = name
292 self.client_id = -1
293 self.version: Optional[Version] = None
294 self._local_ports = set()
295 self.subscriptions = set()
296 super().__init__(SEQUENCER_PATH, **kwargs)
298 def __iter__(self) -> Iterable["Event"]:
299 """
300 Build an infinite iterator that streams MIDI events from the
301 subscribed ports.
302 You'll need an open sequencer before using it:
304 ```python
305 from linuxpy.midi.device import Sequencer
307 with Sequencer() as midi:
308 port = midi.create_port()
309 port.connect_from((0, 1))
310 for event in midi:
311 print(event)
312 ```
313 """
314 return event_stream(self)
316 async def __aiter__(self) -> AsyncIterable["Event"]:
317 """
318 Build an infinite async iterator that streams MIDI events from the
319 subscribed ports.
320 You'll need an open sequencer before using it:
322 ```python
323 import asyncio
324 from linuxpy.midi.device import Sequencer
326 async def main():
327 with Sequencer() as midi:
328 port = midi.create_port()
329 port.connect_from((0, 1))
330 async for event in midi:
331 print(event)
333 asyncio.run(main())
334 ```
335 """
336 async for event in async_event_stream(self, maxsize=10):
337 yield event
339 def _on_open(self):
340 self.client_id = read_client_id(self)
341 client_info = read_client_info(self, self.client_id)
342 self.version = Version.from_number(read_pversion(self))
343 client_info.name = self.name.encode()
344 write_client_info(self, client_info)
346 def _on_close(self):
347 # TODO: delete all open ports
348 for port in set(self._local_ports):
349 self.delete_port(port)
351 @property
352 def client_info(self) -> snd_seq_client_info:
353 """Current Client information"""
354 return read_client_info(self, self.client_id)
356 @property
357 def client(self) -> "Client":
358 """Current Client information"""
359 return self.get_client(self.client_id)
361 @property
362 def running_mode(self) -> snd_seq_running_info:
363 """Current running mode"""
364 return read_running_mode(self)
366 @property
367 def system_info(self) -> snd_seq_system_info:
368 """Current system information"""
369 return read_system_info(self)
371 def get_client(self, client_id: int) -> "Client":
372 """
373 Returns a Client for the given ID or raises an error if the client
374 doesn't exist.
375 It returns new Client object each time
376 """
377 info = read_client_info(self, client_id)
378 return Client(self, info)
380 @property
381 def iter_clients(self) -> Iterable["Client"]:
382 """An iterator over all open clients on the system. It returns new Client each time"""
383 return (Client(self, client_info) for client_info in iter_read_clients(self))
385 @property
386 def clients(self) -> Sequence["Client"]:
387 """Returns a new list of all clients on the system"""
388 return list(self.iter_clients)
390 @property
391 def iter_ports(self) -> Iterable["Port"]:
392 """
393 An iterator over all open ports on the system.
394 It returns new Port objects each time
395 """
396 for client in self.iter_clients:
397 for port in iter_read_ports(self, client.client_id):
398 yield Port(self, port)
400 @property
401 def ports(self) -> Sequence["Port"]:
402 """
403 Returns a new list of all open ports on the system.
404 It returns new Port objects each time
405 """
406 return list(self.iter_ports)
408 def get_port(self, address: FullPortAddress) -> "Port":
409 """
410 Returns a Port for the given address or raises an error if the port
411 doesn't exist.
412 It returns new Port object each time
413 """
414 port_address = to_address(address)
415 info = read_port_info(self, port_address.client, port_address.port)
416 return Port(self, info)
418 def create_port(
419 self,
420 name: str = "linuxpy port",
421 capabilities: PortCapability = INPUT_OUTPUT,
422 port_type: PortType = PortType.MIDI_GENERIC | PortType.APPLICATION,
423 ) -> "Port":
424 """
425 Create a new local port. By default it will create a MIDI generic
426 application Input/Output port.
427 """
428 port_info = snd_seq_port_info()
429 port_info.name = name.encode()
430 port_info.addr.client = self.client_id
431 port_info.capability = capabilities
432 port_info.type = port_type
433 port_info.midi_channels = 16
434 port_info.midi_voices = 64
435 port_info.synth_voices = 0
436 create_port(self, port_info)
437 port = Port(self, port_info)
438 self._local_ports.add(port_info.addr.port)
439 return port
441 def delete_port(self, port: Union[int, "Port"]):
442 """
443 Delete a previously created local port. If the port has any
444 subscriptions they will be closed before the port is deleted
445 """
446 if isinstance(port, int):
447 addr = self.client_id, port
448 port_info = snd_seq_port_info(addr=snd_seq_addr(client=self.client_id, port=port))
449 else:
450 port_info = port.info
451 if port_info.addr.client != self.client_id:
452 raise MidiError("Cannot delete non local port")
453 addr = port_info.addr.client, port_info.addr.port
454 # unsubscribe first
455 for uid in set(self.subscriptions):
456 if addr in {uid[0:2], uid[2:4]}:
457 self.unsubscribe(uid[0:2], uid[2:4])
458 self._local_ports.remove(addr[1])
459 delete_port(self, port_info)
461 def subscribe(self, src: FullPortAddress, dest: FullPortAddress):
462 """
463 Subscribe a source port to a destination port
464 """
465 src = to_address(src)
466 dest = to_address(dest)
467 uid = (src.client, src.port, dest.client, dest.port)
468 self.subscriptions.add(uid)
469 subscribe(self, src, dest)
471 def unsubscribe(self, src: FullPortAddress, dest: FullPortAddress):
472 """
473 Unsubscribe a previously subscribed source port to a destination port
474 """
475 src = to_address(src)
476 dest = to_address(dest)
477 uid = (src.client, src.port, dest.client, dest.port)
478 self.subscriptions.remove(uid)
479 try:
480 unsubscribe(self, src, dest)
481 except OSError as error:
482 if error.errno == errno.ENXIO:
483 self.log.info("Could not delete port (maybe device was unplugged)")
484 else:
485 raise
487 def iter_raw_read(self, max_nb_packets: int = 64) -> Iterable["Event"]:
488 """
489 Read list of pending events. If the sequencer is opened in blocking
490 mode and there are no events it blocks until at least one event occurs
491 otherwise as OSError is raised.
493 Use the `read()` call instead because it handles blocking vs
494 non-blocking variants transperently.
495 """
496 payload = self._fobj.read(max_nb_packets * EVENT_SIZE)
497 nb_packets = len(payload) // EVENT_SIZE
498 i = 0
499 while i < nb_packets:
500 start = i * EVENT_SIZE
501 packet = payload[start : start + EVENT_SIZE]
502 event = Event(snd_seq_event.from_buffer_copy(packet))
503 i += 1
504 if event.is_variable_length_type:
505 size = event.event.data.ext.len
506 nb = (size + EVENT_SIZE - 1) // EVENT_SIZE
507 event.data = payload[i * EVENT_SIZE : i * EVENT_SIZE + size][1:-1]
508 i += nb
509 yield event
511 def raw_read(self, max_nb_packets=64) -> Sequence["Event"]:
512 """
513 Read list of pending events. If there are no events it blocks until at
514 least one event occurs and returns it.
516 Use the `read()` call instead because it handles blocking vs
517 non-blocking variants transperently.
518 """
519 return tuple(self.iter_raw_read(max_nb_packets=max_nb_packets))
521 def wait_read(self) -> Sequence["Event"]:
522 """
523 Read list of pending events. If there are no events it blocks until at
524 least one event occurs and returns it.
525 This method assumes the internal file descriptior was opened in
526 non-blocking mode.
528 Use the `read()` call instead because it handles blocking vs
529 non-blocking variants transperently
530 """
531 if self.io.select is not None:
532 self.io.select((self,), (), ())
533 return self.raw_read()
535 def read(self) -> Sequence["Event"]:
536 """
537 Read list of pending events. If there are no events it blocks until at
538 least one event occurs and returns it
539 """
540 # first time we check what mode device was opened (blocking vs non-blocking)
541 if self.is_blocking:
542 self.read = self.raw_read
543 else:
544 self.read = self.wait_read
545 return self.read()
547 def write(self, event: "Event"):
548 """Send an event message"""
549 self._fobj.write(bytes(event))
551 def send(
552 self,
553 port: PortAddress,
554 event_type: Union[str, int, EventType],
555 queue: int = QUEUE_DIRECT,
556 to: Union[FullPortAddress, FullPortAddresses] = SUBSCRIBERS,
557 **kwargs,
558 ):
559 """
560 Send a message of the given type from a specific port to the destination
561 address(es). Use kwargs to pass specific event arguments like velocity in
562 a "note on" event.
564 event_type can be an instance of EventType or the equivalent number or
565 a case insensitive string matching the event type (ex: "noteon", "NOTEON",
566 "note-on" or "note on").
568 The following example sends "note on" with velocity 45 on port 0 of client 14:
570 ```python
571 midi.send((14, 0), "note on", velocity=45)
572 ```
573 """
574 event = Event.new(event_type, **kwargs)
575 event.queue = queue
576 event.source = to_address((self.client_id, port))
577 if isinstance(to, Sequence) and isinstance(to[0], Sequence):
578 to_list = to
579 else:
580 to_list = (to,)
581 for dest in to_list:
582 event.dest = to_address(dest)
583 self.write(event)
586class Client:
587 """
588 MIDI sequencer client. Don't instantiate this object directly
589 Use instead `Sequencer.get_client()`
590 """
592 def __init__(self, sequencer: Sequencer, client: snd_seq_client_info):
593 self.sequencer = sequencer
594 self.info = client
596 def __int__(self):
597 "The client ID"
598 return self.client_id
600 @property
601 def name(self) -> str:
602 "Client name"
603 return self.info.name.decode()
605 @property
606 def client_id(self):
607 return self.info.client
609 @property
610 def type(self):
611 return ClientType(self.info.type)
613 @property
614 def is_local(self) -> bool:
615 """
616 True if the client was created by the MIDI sequencer that it
617 references or False otherwise"
618 """
619 return self.sequencer.client_id == self.info.client
621 @property
622 def iter_ports(self) -> Iterable["Port"]:
623 """An iterator over all open ports for this client. It returns new Port each time"""
624 for port in iter_read_ports(self.sequencer, self.client_id):
625 yield Port(self.sequencer, port)
627 @property
628 def ports(self) -> Sequence["Port"]:
629 """Returns a new list of all open ports for this client"""
630 return list(self.iter_ports)
632 def get_port(self, port_id: int) -> "Port":
633 return self.sequencer.get_port((self.client_id, port_id))
635 def refresh(self):
636 self.info = read_client_info(self.sequencer, self.client_id)
639class Port:
640 """
641 MIDI sequencer port. Don't instantiate this object directly
642 Use instead `Sequencer.get_port()`
643 """
645 def __init__(self, sequencer: Sequencer, port: snd_seq_port_info):
646 self.sequencer = sequencer
647 self.info = port
649 def __repr__(self):
650 prefix = self.is_local and "Local" or ""
651 name = f"{prefix}{type(self).__name__}"
652 client = self.client_id
653 port = self.port_id
654 ptype = str(self.type).split(".", 1)[-1]
655 caps = str(self.capability).split(".", 1)[-1]
656 return f"<{name} {client=}, {port=}, type={ptype}, {caps=}>"
658 def __str__(self):
659 ptype = str(self.type).split(".", 1)[-1]
660 caps = str(self.capability).split(".", 1)[-1]
661 return f"{self.client_id:3}:{self.port_id:<3} {self.name} {ptype} {caps}"
663 def __int__(self):
664 "The port ID"
665 return self.port_id
667 @property
668 def name(self) -> str:
669 "Port name"
670 return self.info.name.decode()
672 @property
673 def is_local(self) -> bool:
674 """
675 True if the port was created by the MIDI sequencer that it
676 references or False otherwise"
677 """
678 return self.sequencer.client_id == self.info.addr.client
680 @property
681 def client_id(self) -> int:
682 """The client ID"""
683 return self.info.addr.client
685 @property
686 def port_id(self) -> int:
687 """The port ID"""
688 return self.info.addr.port
690 @property
691 def type(self) -> PortType:
692 """The port type"""
693 return PortType(self.info.type)
695 @property
696 def capability(self) -> PortCapability:
697 """The port capabilities"""
698 return PortCapability(self.info.capability)
700 @property
701 def address(self) -> snd_seq_addr:
702 """The port address"""
703 return self.info.addr
705 # MIDI In
706 def connect_from(self, src: FullPortAddress):
707 """
708 Connect this port to a remote port. After connecting, this port will
709 receive events originating from the source port.
711 Example:
713 ```python
714 from linuxpy.midi.device import Sequencer
716 with Sequencer() as midi:
717 port = midi.create_port()
718 port.connect_from((0, 1))
719 for event in midi:
720 print(event)
721 ```
722 """
723 if not self.is_local:
724 raise MidiError("Can only connect local port")
725 self.sequencer.subscribe(src, self.address)
727 def disconnect_from(self, src: FullPortAddress):
728 """
729 Disconnect this port from a previously connected source port.
730 """
731 if not self.is_local:
732 raise MidiError("Can only disconnect local port")
733 self.sequencer.unsubscribe(src, self.address)
735 # MIDI Out
736 def connect_to(self, dest: FullPortAddress):
737 """
738 Connect this port to a remote port. After connecting, events
739 originating from this port will be sent to the destination port.
741 Example:
743 ```python
744 from linuxpy.midi.device import Sequencer
746 with Sequencer() as midi:
747 port = midi.create_port()
748 # Assume 14:0 is Midi Through
749 port.connect_to((14, 0))
750 port.send("note on", note=11, velocity=10)
751 ```
753 """
754 if not self.is_local:
755 raise MidiError("Can only connect local port")
756 self.sequencer.subscribe(self.address, dest)
758 def disconnect_to(self, dest: FullPortAddress):
759 """
760 Disconnect this port from a previously connected destination port.
761 """
762 if not self.is_local:
763 raise MidiError("Can only disconnect local port")
764 self.sequencer.unsubscribe(self.address, dest)
766 def delete(self):
767 """
768 Delete this port. Raises MidiError if port is not local.
769 Any subscriptions are canceled before the port is deleted.
770 """
771 if not self.is_local:
772 raise MidiError("Can only delete local port")
773 self.sequencer.delete_port(self)
775 def send(self, event_type: Union[str, int, EventType], **kwargs):
776 """
777 Send a message of the given type from to the destination address(es).
778 Use kwargs to pass specific event arguments like velocity in a
779 "note on" event.
781 event_type can be an instance of EventType or the equivalent number or
782 a case insensitive string matching the event type (ex: "noteon", "NOTEON",
783 "note-on" or "note on").
785 The following example sends "note on" on note 42, with velocity 45:
787 ```python
788 port.send("note on", note=42, velocity=45)
789 ```
790 """
791 self.sequencer.send(self, event_type, **kwargs)
794EVENT_TYPE_INFO = {
795 EventType.SYSTEM: ("System", "result"),
796 EventType.RESULT: ("Result", "result"),
797 EventType.NOTE: ("Note", "note"),
798 EventType.NOTEON: ("Note on", "note"),
799 EventType.NOTEOFF: ("Note off", "note"),
800 EventType.KEYPRESS: ("Polyphonic aftertouch", "note"),
801 EventType.CONTROLLER: ("Control change", "control"),
802 EventType.PGMCHANGE: ("Program change", "control"),
803 EventType.CHANPRESS: ("Channel aftertouch", "control"),
804 EventType.PITCHBEND: ("Pitch bend", "control"),
805 EventType.CONTROL14: ("Control change", "control"),
806 EventType.NONREGPARAM: ("Non-reg. param.", "control"),
807 EventType.REGPARAM: ("Reg param.", "control"),
808 EventType.SONGPOS: ("Song position ptr", "control"),
809 EventType.SONGSEL: ("Song select", "control"),
810 EventType.QFRAME: ("MTC Quarter frame", "control"),
811 EventType.TIMESIGN: ("SMF time signature", "control"),
812 EventType.KEYSIGN: ("SMF key signature", "control"),
813 EventType.START: ("Start", "queue"),
814 EventType.CONTINUE: ("Continue", "queue"),
815 EventType.STOP: ("Stop", "queue"),
816 EventType.SETPOS_TICK: ("Set tick queue pos.", "queue"),
817 EventType.SETPOS_TIME: ("Set rt queue pos.", "queue"),
818 EventType.TEMPO: ("Set queue tempo", "queue"),
819 EventType.CLOCK: ("Clock", "queue"),
820 EventType.TICK: ("Tick", "queue"),
821 EventType.QUEUE_SKEW: ("Queue timer skew", "queue"),
822 EventType.TUNE_REQUEST: ("Tune request", None),
823 EventType.RESET: ("Reset", None),
824 EventType.SENSING: ("Active sensing", None),
825 EventType.CLIENT_START: ("Client start", "addr"),
826 EventType.CLIENT_EXIT: ("Client exit", "addr"),
827 EventType.CLIENT_CHANGE: ("Client change", "addr"),
828 EventType.PORT_START: ("Port start", "addr"),
829 EventType.PORT_EXIT: ("Port exit", "addr"),
830 EventType.PORT_CHANGE: ("Port change", "addr"),
831 EventType.PORT_SUBSCRIBED: ("Port subscribed", "connect"),
832 EventType.PORT_UNSUBSCRIBED: ("Port unsubscribed", "connect"),
833 EventType.SYSEX: ("System exclusive", "ext"),
834}
837def struct_text(obj):
838 fields = []
839 for field_name, _, value in obj:
840 if isinstance(value, Struct):
841 value = f"({struct_text(value)})"
842 elif isinstance(value, ctypes.Union):
843 continue
844 else:
845 value = str(value)
846 fields.append(f"{field_name}={value}")
847 return ", ".join(fields)
850class Event:
851 """Event message object result of listening on a sequencer"""
853 SIXEX_START = RealtimeStatusCode.SYSEX_START.to_bytes(1, "big")
854 SIXEX_END = RealtimeStatusCode.SYSEX_END.to_bytes(1, "big")
856 def __init__(self, event: snd_seq_event):
857 self.event = event
858 self.data = b""
860 def __repr__(self):
861 cname = type(self).__name__
862 return f"<{cname} type={self.type.name}>"
864 def __str__(self):
865 data = EVENT_TYPE_INFO.get(self.type)
866 if data is None:
867 return self.type.name
868 name, member_name = data
869 addr = f"{self.source_client_id:}:{self.source_port_id:} {self.dest_client_id:}:{self.dest_port_id:}"
870 result = f"{addr} {name} "
871 if self.type == EventType.SYSEX:
872 result += " ".join(f"{i:02X}" for i in self.raw_data)
873 elif self.type == EventType.CLOCK:
874 queue_ctrl = self.queue_ctrl
875 real_time = queue_ctrl.param.time.time
876 timestamp = real_time.tv_sec + real_time.tv_nsec * 1e-9
877 result += f"queue={queue_ctrl.queue} {timestamp=}"
878 elif member_name:
879 member = getattr(self.event.data, member_name)
880 result += struct_text(member)
881 return result
883 def __bytes__(self):
884 """Serialize the Event in a bytes ready to be sent"""
885 if self.type == EventType.SYSEX:
886 self.event.flags = EventLength.VARIABLE
887 if self.is_variable_length_type:
888 data = self.raw_data
889 self.event.data.ext.len = len(data)
890 self.event.data.ext.ptr = cvoidp()
891 return bytes(self.event) + data
892 return bytes(self.event)
894 @classmethod
895 def new(cls, etype: EventT, **kwargs):
896 """Create new Event of the given type"""
897 data = kwargs.pop("data", b"")
898 event = snd_seq_event()
899 etype = to_event_type(etype)
900 event.type = etype
901 event_info = EVENT_TYPE_INFO.get(etype)
902 if event_info:
903 _, member_name = event_info
904 member = getattr(event.data, member_name)
905 allowed = {name for name, _ in member._fields_}
906 not_allowed = set(kwargs) - allowed
907 if not_allowed:
908 raise ValueError(f"These fields are not allowed for {etype.name}: {', '.join(not_allowed)}")
909 for key, value in kwargs.items():
910 setattr(member, key, value)
911 result = cls(event)
912 result.data = data
913 return result
915 @property
916 def raw_data(self) -> bytes:
917 if self.type == EventType.SYSEX and not self.data.startswith(self.SIXEX_START):
918 return self.SIXEX_START + self.data + self.SIXEX_END
919 return self.data
921 @property
922 def type(self) -> EventType:
923 return EventType(self.event.type)
925 @property
926 def length_type(self) -> EventLength:
927 return EventLength(self.event.flags & EventLength.MASK.value)
929 @property
930 def is_variable_length_type(self) -> bool:
931 return self.length_type == EventLength.VARIABLE
933 @property
934 def timestamp_type(self) -> TimeStamp:
935 return TimeStamp(self.event.flags & TimeStamp.MASK.value)
937 @property
938 def time_mode(self) -> TimeMode:
939 return TimeMode(self.event.flags & TimeMode.MASK.value)
941 @property
942 def timestamp(self) -> Union[float, int]:
943 if self.timestamp_type == TimeStamp.REAL:
944 return self.event.data.time.time.tv_sec + self.event.data.time.time.tv_nsec * 1e-9
945 return self.event.data.time.tick
947 @property
948 def note(self):
949 return self.event.data.note
951 @property
952 def control(self):
953 return self.event.data.control
955 @property
956 def connect(self):
957 return self.event.data.connect
959 @property
960 def queue_ctrl(self):
961 return self.event.data.queue
963 @property
964 def source(self) -> snd_seq_addr:
965 return self.event.source
967 @source.setter
968 def source(self, address: FullPortAddress):
969 self.event.source = to_address(address)
971 @property
972 def dest(self) -> snd_seq_addr:
973 return self.event.dest
975 @dest.setter
976 def dest(self, address: FullPortAddress):
977 self.event.dest = to_address(address)
979 @property
980 def source_client_id(self) -> int:
981 return self.event.source.client
983 @property
984 def source_port_id(self) -> int:
985 return self.event.source.port
987 @property
988 def dest_client_id(self) -> int:
989 return self.event.dest.client
991 @property
992 def dest_port_id(self) -> int:
993 return self.event.dest.port
995 @property
996 def queue(self):
997 return self.event.queue
999 @queue.setter
1000 def queue(self, queue: int):
1001 self.event.queue = queue
1003 client_id = source_client_id
1004 port_id = source_port_id
1007def event_stream(sequencer: Sequencer) -> Iterable[Event]:
1008 """Infinite stream of events coming from the given sequencer"""
1009 while True:
1010 yield from sequencer.read()
1013async def async_event_stream(sequencer: Sequencer, maxsize: int = 10) -> AsyncIterable[Event]:
1014 """Infinite async stream of events coming from the given sequencer"""
1015 queue = asyncio.Queue(maxsize=maxsize)
1017 def feed():
1018 for event in sequencer.read():
1019 queue.put_nowait(event)
1021 with add_reader_asyncio(sequencer.fileno(), feed):
1022 while True:
1023 yield await queue.get()
1026if __name__ == "__main__": 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true
1027 seq = Sequencer()
1028 seq.open()