Coverage for linuxpy/usb/usbsys.py: 0%

329 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2025-05-27 13:54 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7import enum 

8import functools 

9import pathlib 

10 

11from .. import sysfs 

12from ..ctypes import sizeof 

13from ..util import bcd_version, make_find 

14from . import raw 

15from .base import USB_DEV_TMPFS_PATH, BaseDevice, DescriptorType 

16from .ids.klass import klass as classes 

17 

18Speed = raw.UsbDeviceSpeed 

19 

20 

21class Removable(enum.Enum): 

22 UNKNOWN = "unknown" 

23 FIXED = "fixed" 

24 REMOVABLE = "removable" 

25 

26 

27def _attr_getter(filename, decode, alternative, mode="r"): 

28 def getter(self): 

29 path = self.syspath / filename 

30 if path.exists(): 

31 with path.open(mode) as fobj: 

32 return decode(fobj.read()) 

33 elif alternative: 

34 return alternative(self) 

35 

36 return getter 

37 

38 

39def cached_attr(filename, decode=str.strip, alternative=None, mode="r"): 

40 return functools.cached_property(_attr_getter(filename, decode, alternative, mode)) 

41 

42 

43def attr(filename, decode=str.strip, alternative=None, mode="r"): 

44 return property(_attr_getter(filename, decode, alternative, mode)) 

45 

46 

47class Attr: 

48 def __get__(self, obj, klass=None): 

49 pass 

50 

51 

52def _decode_speed(speed): 

53 speed = int(speed) 

54 if speed == 1: 

55 return Speed.LOW 

56 elif speed == 12: 

57 return Speed.FULL 

58 elif speed == 480: 

59 return Speed.HIGH 

60 elif speed == 5_000: 

61 return Speed.SUPER 

62 elif speed == 10_000: 

63 return Speed.SUPER_PLUS 

64 return Speed.UNKNOWN 

65 

66 

67def _manufacturer_alternative(device): 

68 return get_vendor_name(device.vendor_id) 

69 

70 

71def _product_alternative(device): 

72 return get_product_name(device.vendor_id, device.product_id) 

73 

74 

75def usb_video_control_descriptor(data, offset=0): 

76 vc = data[offset + 2] 

77 if vc == raw.VideoControl.HEADER: 

78 return raw.uvc_header_descriptor.from_buffer_copy(data, offset) 

79 elif vc == raw.VideoControl.INPUT_TERMINAL: 

80 return raw.uvc_input_terminal_descriptor.from_buffer_copy(data, offset) 

81 elif vc == raw.VideoControl.OUTPUT_TERMINAL: 

82 return raw.uvc_output_terminal_descriptor.from_buffer_copy(data, offset) 

83 elif vc == raw.VideoControl.EXTENSION_UNIT: 

84 return raw.uvc_extension_unit_descriptor.from_buffer_copy(data, offset) 

85 elif vc == raw.VideoControl.PROCESSING_UNIT: 

86 return raw.uvc_processing_unit_descriptor.from_buffer_copy(data, offset) 

87 elif vc == raw.VideoControl.SELECTOR_UNIT: 

88 return raw.uvc_selector_unit_descriptor.from_buffer_copy(data, offset) 

89 if offset: 

90 return data[offset:] 

91 return data 

92 

93 

94def descriptor_node(klass): 

95 klass._children = None 

96 klass._parent = None 

97 klass._device = None 

98 

99 def get_parent(self): 

100 return self._parent 

101 

102 def set_parent(self, parent): 

103 self._parent = parent 

104 

105 def get_device(self): 

106 return self._device 

107 

108 def set_device(self, device): 

109 self._device = device 

110 

111 def get_children(self): 

112 if self._children is None: 

113 self._children = [] 

114 return self._children 

115 

116 def getitem(self, key): 

117 return self.children[key] 

118 

119 def size(self): 

120 return len(self.children) 

121 

122 def iterator(self): 

123 return iter(self.children) 

124 

125 def get_type(self) -> DescriptorType: 

126 try: 

127 return DescriptorType(self.bDescriptorType) 

128 except ValueError: 

129 return self.bDescriptorType 

130 

131 klass.parent = property(get_parent, set_parent) 

132 klass.device = property(get_device, set_device) 

133 klass.children = property(get_children) 

134 klass.type = property(get_type) 

135 klass.length = property(lambda self: self.bLength) 

136 klass.__getitem__ = getitem 

137 klass.__iter__ = iterator 

138 klass.__len__ = size 

139 return klass 

140 

141 

142def get_vendor(vendor_id: int) -> dict: 

143 from linuxpy.usb.ids.vendor import vendor 

144 

145 return vendor.get(vendor_id, {}) 

146 

147 

148def get_vendor_name(vendor_id: int) -> str: 

149 return get_vendor(vendor_id).get("name", "") 

150 

151 

152def get_products(vendor_id: int) -> dict: 

153 return get_vendor(vendor_id).get("children", {}) 

154 

155 

156def get_product(vendor_id: int, product_id: int) -> dict: 

157 return get_products(vendor_id).get(product_id, {}) 

158 

159 

160def get_product_name(vendor_id: int, product_id: int) -> str: 

161 return get_product(vendor_id, product_id).get("name", "") 

162 

163 

164def get_class(class_id: int) -> dict: 

165 return classes.get(class_id, {}) 

166 

167 

168def get_class_name(class_id: int) -> str: 

169 return get_class(class_id).get("name", "") 

170 

171 

172def get_subclasses(class_id: int) -> dict: 

173 return get_class(class_id).get("children", {}) 

174 

175 

176def get_subclass(class_id: int, subclass_id: int) -> dict: 

177 return get_subclasses(class_id).get(subclass_id, {}) 

178 

179 

180def get_subclass_name(class_id: int, subclass_id: int) -> str: 

181 return get_subclass(class_id, subclass_id).get("name", "") 

182 

183 

184def get_protocols(class_id: int, subclass_id: int) -> dict: 

185 return get_subclass(class_id, subclass_id).get("children", {}) 

186 

187 

188def get_protocol(class_id: int, subclass_id: int, protocol_id: int) -> dict: 

189 return get_protocols(class_id, subclass_id).get(protocol_id, {}) 

190 

191 

192def get_protocol_name(class_id, subclass_id, protocol_id): 

193 return get_protocol(class_id, subclass_id, protocol_id).get("name", "") 

194 

195 

196@descriptor_node 

197class DeviceDescriptor(raw.usb_device_descriptor): 

198 @property 

199 def usb_version(self): 

200 return bcd_version(self.bcdUSB) 

201 

202 @property 

203 def version(self): 

204 return bcd_version(self.bcdDevice) 

205 

206 @property 

207 def class_id(self): 

208 return self.bDeviceClass 

209 

210 @property 

211 def subclass_id(self): 

212 return self.bDeviceSubClass 

213 

214 @property 

215 def protocol_id(self): 

216 return self.bDeviceProtocol 

217 

218 @property 

219 def vendor_id(self): 

220 return self.idVendor 

221 

222 @property 

223 def vendor_name(self): 

224 return get_vendor_name(self.vendor_id) 

225 

226 @property 

227 def product_id(self): 

228 return self.idProduct 

229 

230 @property 

231 def product_name(self): 

232 return get_product_name(self.vendor_id, self.product_id) 

233 

234 @property 

235 def nb_configurations(self): 

236 return self.bNumConfigurations 

237 

238 @property 

239 def device_class(self) -> raw.Class: 

240 return raw.Class(self.class_id) 

241 

242 @property 

243 def class_name(self) -> str: 

244 return get_class_name(self.class_id) 

245 

246 @property 

247 def subclass_name(self) -> str: 

248 return get_subclass_name(self.class_id, self.subclass_id) 

249 

250 @property 

251 def protocol_name(self) -> str: 

252 return get_protocol_name(self.class_id, self.subclass_id, self.protocol_id) 

253 

254 def __str__(self): 

255 return f"""\ 

256type {self.type.value:6} {self.type.name} 

257length {self.length:6} bytes 

258class {self.class_id:6} {self.class_name} 

259subclass {self.subclass_id:6} {self.subclass_name} 

260protocol {self.protocol_id:6} {self.protocol_name} 

261max packet size {self.bMaxPacketSize0:6} 

262version {self.version:>6} 

263usb version {self.usb_version:>6} 

264vendor 0x{self.vendor_id:4x} {self.vendor_name} 

265product 0x{self.product_id:4x} {self.product_name} 

266manufacturer {self.iManufacturer:6} 

267product {self.iProduct:6} 

268serial number {self.iSerialNumber:6} 

269nb. configs {self.nb_configurations:6}""" 

270 

271 

272@descriptor_node 

273class Configuration(raw.usb_config_descriptor): 

274 @property 

275 def max_power(self) -> int: 

276 """ 

277 Maximum power consumption in this specific configuration when the 

278 device is fully operational (mA). 

279 """ 

280 return self.bMaxPower * 2 

281 

282 @property 

283 def is_self_powered(self) -> bool: 

284 return bool(self.bmAttributes & 0b100000) 

285 

286 @property 

287 def has_remote_wakeup(self) -> bool: 

288 return bool(self.bmAttributes & 0b10000) 

289 

290 

291@descriptor_node 

292class Interface(raw.usb_interface_descriptor): 

293 @property 

294 def class_id(self): 

295 return self.bInterfaceClass 

296 

297 @property 

298 def subclass_id(self): 

299 return self.bInterfaceSubClass 

300 

301 @property 

302 def protocol_id(self): 

303 return self.bInterfaceProtocol 

304 

305 @property 

306 def interface_class(self) -> raw.Class: 

307 return raw.Class(self.class_id) 

308 

309 @property 

310 def interface_subclass(self): 

311 if self.interface_class == raw.Class.VIDEO: 

312 return raw.VideoSubClass(self.subclass_id) 

313 return self.subclass_id 

314 

315 @property 

316 def class_name(self) -> str: 

317 return get_class_name(self.class_id) 

318 

319 @property 

320 def subclass_name(self) -> str: 

321 return get_subclass_name(self.class_id, self.subclass_id) 

322 

323 @property 

324 def protocol_name(self) -> str: 

325 return get_protocol_name(self.class_id, self.subclass_id, self.protocol_id) 

326 

327 

328@descriptor_node 

329class Endpoint(raw.usb_endpoint_descriptor): 

330 DIRECTION_MASK = 0x80 

331 TRANSFER_TYPE_MASK = 0b11 

332 

333 @property 

334 def address(self): 

335 return self.bEndpointAddress 

336 

337 @property 

338 def direction(self): 

339 return raw.Direction(self.address & self.DIRECTION_MASK) 

340 

341 @property 

342 def transfer_type(self): 

343 return raw.EndpointTransferType(self.bmAttributes & self.TRANSFER_TYPE_MASK) 

344 

345 def read(self): 

346 return self.device.read() 

347 

348 def __repr__(self): 

349 name = type(self).__name__ 

350 return f"{name} address=0x{self.address:X} type={self.transfer_type.name} direction={self.direction.name}" 

351 

352 

353class HID(raw.usb_hid_descriptor): 

354 @property 

355 def version(self): 

356 return bcd_version(self.bcdHID) 

357 

358 

359DESCRIPTOR_HIERARCHY = { 

360 DescriptorType.DEVICE: None, 

361 DescriptorType.CONFIG: DescriptorType.DEVICE, 

362 DescriptorType.INTERFACE: DescriptorType.CONFIG, 

363 DescriptorType.ENDPOINT: DescriptorType.INTERFACE, 

364 DescriptorType.INTERFACE_ASSOCIATION: DescriptorType.CONFIG, 

365 DescriptorType.VIDEO_CONTROL: DescriptorType.INTERFACE, 

366 DescriptorType.HID: DescriptorType.INTERFACE, 

367} 

368 

369 

370DESCRIPTOR_STRUCT_MAP = { 

371 DescriptorType.DEVICE: DeviceDescriptor, 

372 DescriptorType.CONFIG: Configuration, 

373 DescriptorType.STRING: raw.usb_string_descriptor, 

374 DescriptorType.INTERFACE: Interface, 

375 DescriptorType.INTERFACE_ASSOCIATION: raw.usb_interface_assoc_descriptor, 

376 DescriptorType.ENDPOINT: Endpoint, 

377 DescriptorType.HID: HID, 

378} 

379 

380 

381DESCRIPTOR_CALL_MAP = { 

382 DescriptorType.VIDEO_CONTROL: usb_video_control_descriptor, 

383} 

384 

385 

386def descs(data): 

387 size = len(data) 

388 ptr = 0 

389 while ptr < (size - 2): 

390 length, dtype = data[ptr], data[ptr + 1] 

391 try: 

392 name = DescriptorType(dtype).name 

393 except ValueError: 

394 name = str(dtype) 

395 print(name, dtype, length) 

396 ptr += length 

397 

398 

399def _iter_decode_descriptors(data): 

400 offset, size = 0, len(data) 

401 while offset < size: 

402 length = data[offset] 

403 dtype = data[offset + 1] 

404 dclass = DESCRIPTOR_STRUCT_MAP.get(dtype) 

405 if dclass is None: 

406 dfunc = DESCRIPTOR_CALL_MAP.get(dtype) 

407 if dfunc is not None: 

408 yield dfunc(data, offset) 

409 else: 

410 extra = sizeof(dclass) - length 

411 if extra: 

412 local = data[offset : offset + length] + extra * b"\x00" 

413 descriptor = dclass.from_buffer_copy(local) 

414 else: 

415 descriptor = dclass.from_buffer_copy(data, offset) 

416 yield descriptor 

417 offset += length 

418 

419 

420def iter_descriptors(data): 

421 last_type = {} 

422 for item in _iter_decode_descriptors(data): 

423 if isinstance(item, bytes): 

424 continue 

425 dtype = item.bDescriptorType 

426 last_type[dtype] = item 

427 parent_type = DESCRIPTOR_HIERARCHY[dtype] 

428 if parent_type is not None: 

429 parent = last_type[parent_type] 

430 parent.children.append(item) 

431 item.parent = parent 

432 yield item 

433 

434 

435def build_descriptor(data): 

436 return list(iter_descriptors(data))[0] 

437 

438 

439def find_descriptor(desc, find_all=False, custom_match=None, **args): 

440 def desc_iter(**kwargs): 

441 for d in desc: 

442 tests = (val == getattr(d, key) for key, val in kwargs.items()) 

443 if all(tests) and (custom_match is None or custom_match(d)): 

444 yield d 

445 

446 result = desc_iter(**args) 

447 return result if find_all else next(result, None) 

448 

449 

450class Device(BaseDevice): 

451 descriptors = cached_attr("descriptors", build_descriptor, mode="rb") 

452 bus_number = cached_attr("busnum", int) 

453 device_number = cached_attr("devnum", int) 

454 manufacturer = cached_attr("manufacturer", alternative=_manufacturer_alternative) 

455 product = cached_attr("product", alternative=_product_alternative) 

456 nb_interfaces = cached_attr("bNumInterfaces", int) 

457 nb_configurations = cached_attr("bNumConfigurations", int) 

458 speed = cached_attr("speed", _decode_speed) 

459 active_configuration = attr("bConfigurationValue", int) 

460 device_class_id = cached_attr("bDeviceClass", int) 

461 device_subclass_id = cached_attr("bDeviceSubClass", int) 

462 device_protocol_id = cached_attr("bDeviceProtocol", int) 

463 product_id = cached_attr("idProduct", lambda text: int(text, 16)) 

464 vendor_id = cached_attr("idVendor", lambda text: int(text, 16)) 

465 removable = cached_attr("removable", lambda text: Removable(text.strip())) 

466 

467 def __init__(self, name_or_file, **kwargs): 

468 self.syspath = pathlib.Path(name_or_file) 

469 dev_name = USB_DEV_TMPFS_PATH / f"{self.bus_number:03d}" / f"{self.device_number:03d}" 

470 super().__init__(dev_name, **kwargs) 

471 

472 def __repr__(self): 

473 return f"{type(self).__name__}(bus={self.bus_number}, device={self.device_number}, syspath={self.syspath.stem})" 

474 

475 def _on_open(self): 

476 self.descriptor = build_descriptor(self._fobj.read()) 

477 

478 

479def is_available(): 

480 return sysfs.is_available() 

481 

482 

483def iter_paths(): 

484 for path in sysfs.DEVICE_PATH.iterdir(): 

485 name = path.name 

486 if (not name[0].isdigit() and not name.startswith("usb")) or ":" in name: 

487 continue 

488 yield path 

489 

490 

491def iter_devices(): 

492 for path in iter_paths(): 

493 yield Device(path) 

494 

495 

496find = make_find(iter_devices, needs_open=False) 

497 

498 

499def lsusb(): 

500 for dev in iter_devices(): 

501 print( 

502 f"Bus {dev.bus_number:03d} Device {dev.device_number:03d}: ID {dev.vendor_id:04x}:{dev.product_id:04x} {dev.manufacturer} {dev.product}" 

503 ) 

504 

505 

506if __name__ == "__main__": 

507 lsusb()