Coverage for linuxpy/usb/base.py: 0%
144 statements
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
1import array
2import enum
3import errno
4import pathlib
6from .. import device, ioctl, kernel
7from ..ctypes import cast, cuint, cvoidp, pointer, u8, u32
8from . import raw
10USB_DEV_PATH = pathlib.Path("/dev")
11USB_DEV_TMPFS_PATH = USB_DEV_PATH / "bus" / "usb"
13DT_DEVICE_SIZE = 18
14DT_CONFIG_SIZE = 9
15DT_INTERFACE_SIZE = 9
16DT_ENDPOINT_SIZE = 7
17DT_ENDPOINT_AUDIO_SIZE = 9 # Audio extension
18DT_HUB_NONVAR_SIZE = 7
19DT_SS_ENDPOINT_COMPANION_SIZE = 6
20DT_BOS_SIZE = 5
21DT_DEVICE_CAPABILITY_SIZE = 3
23if kernel.VERSION >= (5, 2, 0):
24 MAX_ISO_PACKET = 98304
25elif kernel.VERSION >= (3, 10, 0):
26 MAX_ISO_PACKET = 49152
27else:
28 MAX_ISO_PACKET = 8192
31class TransferType(enum.IntEnum):
32 CONTROL = 0x0
33 ISOCHRONOUS = 0x1
34 BULK = 0x2
35 INTERRUPT = 0x3
38class DescriptorType(enum.IntEnum):
39 DEVICE = 0x1
40 CONFIG = 0x2
41 STRING = 0x3
42 INTERFACE = 0x4
43 ENDPOINT = 0x5
44 INTERFACE_ASSOCIATION = 0x0B
45 BOS = 0x0F
46 DEVICE_CAPABILITY = 0x10
47 HID = 0x21
48 REPORT = 0x22
49 PHYSICAL = 0x23
50 VIDEO_CONTROL = 0x24
52 HUB = 0x29
53 SUPERSPEED_HUB = 0x2A
54 SS_ENDPOINT_COMPANION = 0x30
57def set_configuration(fd, n):
58 n = cuint(n)
59 return ioctl.ioctl(fd, raw.IOC.SETCONFIGURATION, n)
62def claim_interface(fd, n):
63 n = cuint(n)
64 return ioctl.ioctl(fd, raw.IOC.CLAIMINTERFACE, n)
67def active_configuration(fd):
68 result = u8(0)
69 ctrl = raw.usbdevfs_ctrltransfer()
70 ctrl.bRequestType = raw.Direction.IN
71 ctrl.bRequest = raw.Request.GET_CONFIGURATION
72 ctrl.wValue = 0
73 ctrl.wIndex = 0
74 ctrl.wLength = 1
75 ctrl.timeout = 1000
76 ctrl.data = cast(pointer(result), cvoidp)
77 ioctl.ioctl(fd, raw.IOC.CONTROL, ctrl)
78 return result.value
81def get_kernel_driver(fd, interface):
82 result = raw.usbdevfs_getdriver()
83 result.interface = interface
84 try:
85 ioctl.ioctl(fd, raw.IOC.GETDRIVER, result)
86 except OSError as error:
87 if error.errno == errno.ENODATA:
88 return
89 raise
90 return result.driver.decode()
93def connect(fd, interface):
94 command = raw.usbdevfs_ioctl()
95 command.ifno = interface
96 command.ioctl_code = raw.IOC.CONNECT
97 ioctl.ioctl(fd, raw.IOC.IOCTL, command)
100def disconnect(fd, interface):
101 command = raw.usbdevfs_ioctl()
102 command.ifno = interface
103 command.ioctl_code = raw.IOC.DISCONNECT
104 try:
105 ioctl.ioctl(fd, raw.IOC.IOCTL, command)
106 except OSError as error:
107 if error.errno == errno.ENODATA:
108 return False
109 raise
110 return True
113def capabilities(fd):
114 caps = u32()
115 ioctl.ioctl(fd, raw.IOC.GET_CAPABILITIES, caps)
116 return raw.Capability(caps.value)
119def speed(fd):
120 result = ioctl.ioctl(fd, raw.IOC.GET_SPEED)
121 return raw.UsbDeviceSpeed(result)
124def bulk_read(fd, endpoint_address: int):
125 data = array.array("B", 4096 * b"\x00")
126 addr, length = data.buffer_info()
127 nbytes = length * data.itemsize
129 urb = raw.usbdevfs_urb()
130 urb.usercontext = 0
131 urb.type = raw.URBType.BULK
132 urb.stream_id = 0
133 urb.endpoint = endpoint_address
134 urb.buffer = addr
135 urb.buffer_length = nbytes
136 ioctl.ioctl(fd, raw.IOC.SUBMITURB, urb)
138 import select
140 r, _, e = select.select((fd,), (), (fd,))
141 reply = raw.usbdevfs_urb()
142 return ioctl.ioctl(fd, raw.IOC.REAPURBNDELAY, reply)
145class BaseDevice(device.BaseDevice):
146 def __repr__(self):
147 return f"{type(self).__name__}(bus={self.bus_number}, address={self.device_address})"
149 @property
150 def bus_number(self):
151 raise NotImplementedError
153 @property
154 def device_address(self):
155 raise NotImplementedError
157 @property
158 def session_id(self):
159 return (self.bus_number << 8) | self.device_address
161 @property
162 def active_configuration(self):
163 return active_configuration(self.fileno())
165 @property
166 def capabilities(self):
167 return capabilities(self.fileno())
169 @property
170 def speed(self):
171 return speed(self.fileno())
173 def get_kernel_driver(self, interface):
174 return get_kernel_driver(self.fileno(), interface)
176 def connect(self, interface):
177 connect(self.fileno(), interface)
179 def disconnect(self, interface):
180 disconnect(self.fileno(), interface)
182 def set_configuration(self, n):
183 return set_configuration(self.fileno(), n)
185 def claim_interface(self, n):
186 return claim_interface(self.fileno(), n)
188 def bulk_read(self, endpoint):
189 return bulk_read(self.fileno(), endpoint.address)