Coverage for linuxpy/usb/base.py: 0%

144 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2025-05-27 13:54 +0200

1import array 

2import enum 

3import errno 

4import pathlib 

5 

6from .. import device, ioctl, kernel 

7from ..ctypes import cast, cuint, cvoidp, pointer, u8, u32 

8from . import raw 

9 

10USB_DEV_PATH = pathlib.Path("/dev") 

11USB_DEV_TMPFS_PATH = USB_DEV_PATH / "bus" / "usb" 

12 

13DT_DEVICE_SIZE = 18 

14DT_CONFIG_SIZE = 9 

15DT_INTERFACE_SIZE = 9 

16DT_ENDPOINT_SIZE = 7 

17DT_ENDPOINT_AUDIO_SIZE = 9 # Audio extension 

18DT_HUB_NONVAR_SIZE = 7 

19DT_SS_ENDPOINT_COMPANION_SIZE = 6 

20DT_BOS_SIZE = 5 

21DT_DEVICE_CAPABILITY_SIZE = 3 

22 

23if kernel.VERSION >= (5, 2, 0): 

24 MAX_ISO_PACKET = 98304 

25elif kernel.VERSION >= (3, 10, 0): 

26 MAX_ISO_PACKET = 49152 

27else: 

28 MAX_ISO_PACKET = 8192 

29 

30 

31class TransferType(enum.IntEnum): 

32 CONTROL = 0x0 

33 ISOCHRONOUS = 0x1 

34 BULK = 0x2 

35 INTERRUPT = 0x3 

36 

37 

38class DescriptorType(enum.IntEnum): 

39 DEVICE = 0x1 

40 CONFIG = 0x2 

41 STRING = 0x3 

42 INTERFACE = 0x4 

43 ENDPOINT = 0x5 

44 INTERFACE_ASSOCIATION = 0x0B 

45 BOS = 0x0F 

46 DEVICE_CAPABILITY = 0x10 

47 HID = 0x21 

48 REPORT = 0x22 

49 PHYSICAL = 0x23 

50 VIDEO_CONTROL = 0x24 

51 

52 HUB = 0x29 

53 SUPERSPEED_HUB = 0x2A 

54 SS_ENDPOINT_COMPANION = 0x30 

55 

56 

57def set_configuration(fd, n): 

58 n = cuint(n) 

59 return ioctl.ioctl(fd, raw.IOC.SETCONFIGURATION, n) 

60 

61 

62def claim_interface(fd, n): 

63 n = cuint(n) 

64 return ioctl.ioctl(fd, raw.IOC.CLAIMINTERFACE, n) 

65 

66 

67def active_configuration(fd): 

68 result = u8(0) 

69 ctrl = raw.usbdevfs_ctrltransfer() 

70 ctrl.bRequestType = raw.Direction.IN 

71 ctrl.bRequest = raw.Request.GET_CONFIGURATION 

72 ctrl.wValue = 0 

73 ctrl.wIndex = 0 

74 ctrl.wLength = 1 

75 ctrl.timeout = 1000 

76 ctrl.data = cast(pointer(result), cvoidp) 

77 ioctl.ioctl(fd, raw.IOC.CONTROL, ctrl) 

78 return result.value 

79 

80 

81def get_kernel_driver(fd, interface): 

82 result = raw.usbdevfs_getdriver() 

83 result.interface = interface 

84 try: 

85 ioctl.ioctl(fd, raw.IOC.GETDRIVER, result) 

86 except OSError as error: 

87 if error.errno == errno.ENODATA: 

88 return 

89 raise 

90 return result.driver.decode() 

91 

92 

93def connect(fd, interface): 

94 command = raw.usbdevfs_ioctl() 

95 command.ifno = interface 

96 command.ioctl_code = raw.IOC.CONNECT 

97 ioctl.ioctl(fd, raw.IOC.IOCTL, command) 

98 

99 

100def disconnect(fd, interface): 

101 command = raw.usbdevfs_ioctl() 

102 command.ifno = interface 

103 command.ioctl_code = raw.IOC.DISCONNECT 

104 try: 

105 ioctl.ioctl(fd, raw.IOC.IOCTL, command) 

106 except OSError as error: 

107 if error.errno == errno.ENODATA: 

108 return False 

109 raise 

110 return True 

111 

112 

113def capabilities(fd): 

114 caps = u32() 

115 ioctl.ioctl(fd, raw.IOC.GET_CAPABILITIES, caps) 

116 return raw.Capability(caps.value) 

117 

118 

119def speed(fd): 

120 result = ioctl.ioctl(fd, raw.IOC.GET_SPEED) 

121 return raw.UsbDeviceSpeed(result) 

122 

123 

124def bulk_read(fd, endpoint_address: int): 

125 data = array.array("B", 4096 * b"\x00") 

126 addr, length = data.buffer_info() 

127 nbytes = length * data.itemsize 

128 

129 urb = raw.usbdevfs_urb() 

130 urb.usercontext = 0 

131 urb.type = raw.URBType.BULK 

132 urb.stream_id = 0 

133 urb.endpoint = endpoint_address 

134 urb.buffer = addr 

135 urb.buffer_length = nbytes 

136 ioctl.ioctl(fd, raw.IOC.SUBMITURB, urb) 

137 

138 import select 

139 

140 r, _, e = select.select((fd,), (), (fd,)) 

141 reply = raw.usbdevfs_urb() 

142 return ioctl.ioctl(fd, raw.IOC.REAPURBNDELAY, reply) 

143 

144 

145class BaseDevice(device.BaseDevice): 

146 def __repr__(self): 

147 return f"{type(self).__name__}(bus={self.bus_number}, address={self.device_address})" 

148 

149 @property 

150 def bus_number(self): 

151 raise NotImplementedError 

152 

153 @property 

154 def device_address(self): 

155 raise NotImplementedError 

156 

157 @property 

158 def session_id(self): 

159 return (self.bus_number << 8) | self.device_address 

160 

161 @property 

162 def active_configuration(self): 

163 return active_configuration(self.fileno()) 

164 

165 @property 

166 def capabilities(self): 

167 return capabilities(self.fileno()) 

168 

169 @property 

170 def speed(self): 

171 return speed(self.fileno()) 

172 

173 def get_kernel_driver(self, interface): 

174 return get_kernel_driver(self.fileno(), interface) 

175 

176 def connect(self, interface): 

177 connect(self.fileno(), interface) 

178 

179 def disconnect(self, interface): 

180 disconnect(self.fileno(), interface) 

181 

182 def set_configuration(self, n): 

183 return set_configuration(self.fileno(), n) 

184 

185 def claim_interface(self, n): 

186 return claim_interface(self.fileno(), n) 

187 

188 def bulk_read(self, endpoint): 

189 return bulk_read(self.fileno(), endpoint.address)