Coverage for linuxpy/video/qt.py: 46%

711 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2026-02-19 15:11 +0100

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7""" 

8Qt helpers for V4L2 (Video 4 Linux 2) subsystem. 

9 

10You'll need to install linuxpy qt optional dependencies (ex: `$pip install linuxpy[qt]`) 

11""" 

12 

13import collections 

14import contextlib 

15import logging 

16import select 

17 

18from qtpy import QtCore, QtGui, QtWidgets 

19 

20from linuxpy.video.device import ( 

21 BaseControl, 

22 BufferType, 

23 ControlType, 

24 Device, 

25 EventControlChange, 

26 EventType, 

27 Frame, 

28 FrameSizeType, 

29 MemoryMap, 

30 PixelFormat, 

31 VideoCapture, 

32) 

33 

34log = logging.getLogger(__name__) 

35 

36 

37def stream(epoll): 

38 while True: 

39 yield from epoll.poll() 

40 

41 

42@contextlib.contextmanager 

43def signals_blocked(widget: QtWidgets.QWidget): 

44 if widget.signalsBlocked(): 

45 yield 

46 else: 

47 widget.blockSignals(True) 

48 try: 

49 yield 

50 finally: 

51 widget.blockSignals(False) 

52 

53 

54class Dispatcher: 

55 def __init__(self): 

56 self.epoll = select.epoll() 

57 self.cameras = {} 

58 self._task = None 

59 

60 def ensure_task(self): 

61 if self._task is None: 

62 self._task = QtCore.QThread() 

63 self._task.run = self.loop 

64 self._task.start() 

65 self._task.finished.connect(self.on_quit) 

66 return self._task 

67 

68 def on_quit(self): 

69 for fd in self.cameras: 

70 self.epoll.unregister(fd) 

71 self.cameras = {} 

72 

73 def register(self, camera, type): 

74 self.ensure_task() 

75 fd = camera.device.fileno() 

76 event_mask = select.EPOLLHUP 

77 if type == "all" or type == "frame": 

78 event_mask |= select.EPOLLIN 

79 if type == "all" or type == "control": 

80 event_mask |= select.EPOLLPRI 

81 registered = fd in self.cameras 

82 self.cameras[fd] = camera 

83 if registered: 

84 self.epoll.modify(fd, event_mask) 

85 else: 

86 self.epoll.register(fd, event_mask) 

87 

88 def unregister(self, camera): 

89 fd = camera.device.fileno() 

90 camera = self.cameras.pop(fd, None) 

91 if camera: 

92 self.epoll.unregister(fd) 

93 

94 def loop(self): 

95 errno = 0 

96 for fd, event_type in stream(self.epoll): 

97 camera = self.cameras[fd] 

98 if event_type & select.EPOLLHUP: 

99 print("Unregister!", fd) 

100 self.epoll.unregister(fd) 

101 else: 

102 if event_type & select.EPOLLPRI: 

103 camera.handle_event() 

104 if event_type & select.EPOLLIN: 

105 camera.handle_frame() 

106 if event_type & select.EPOLLERR: 

107 errno += 1 

108 print("ERROR", errno) 

109 

110 

111dispatcher = Dispatcher() 

112 

113 

114class QimageMemory(MemoryMap): 

115 def open(self): 

116 super().open() 

117 self.data = None 

118 self.opencv = None 

119 format = self.format 

120 if format.pixel_format in FORMAT_MAP: 

121 self.data = bytearray(format.size) 

122 elif format.pixel_format in OPENCV_FORMATS: 

123 self.data = bytearray(format.size) 

124 self.opencv = create_opencv_buffer(format) 

125 

126 def raw_read(self, into=None): 

127 frame = super().raw_read(self.data) 

128 qimage = None 

129 format = self.format 

130 if self.data is not None: 

131 if self.opencv is None: 

132 data = self.data 

133 qformat = FORMAT_MAP.get(format.pixel_format) 

134 else: 

135 data = self.opencv 

136 opencv_frame_to_rgb24(frame, data) 

137 qformat = QtGui.QImage.Format.Format_RGB888 

138 qimage = QtGui.QImage(data, format.width, format.height, qformat) 

139 frame.user_data = qimage 

140 return frame 

141 

142 

143class QCamera(QtCore.QObject): 

144 frameChanged = QtCore.Signal(object) 

145 stateChanged = QtCore.Signal(str) 

146 

147 def __init__(self, device: Device): 

148 super().__init__() 

149 self.device = device 

150 self.capture = VideoCapture(device, buffer_type=QimageMemory) 

151 self._stop = False 

152 self._stream = None 

153 self._state = "stopped" 

154 self.controls = {} 

155 

156 def __enter__(self): 

157 self.device.open() 

158 dispatcher.register(self, "control") 

159 return self 

160 

161 def __exit__(self, *exc): 

162 dispatcher.unregister(self) 

163 self.device.close() 

164 

165 @classmethod 

166 def from_id(cls, did: int, **kwargs): 

167 device = Device.from_id(did, **kwargs) 

168 return cls(device) 

169 

170 @classmethod 

171 def from_path(cls, path, **kwargs): 

172 device = Device(path, **kwargs) 

173 return cls(device) 

174 

175 def handle_frame(self): 

176 if self._stream is None: 

177 return 

178 frame = next(self._stream) 

179 self.frameChanged.emit(frame) 

180 

181 def handle_event(self): 

182 event = self.device.deque_event() 

183 if event.type == EventType.CTRL: 

184 evt = event.u.ctrl 

185 if not EventControlChange.VALUE & evt.changes: 

186 # Skip non value changes (flags, ranges, dimensions) 

187 log.info("Skip event %s", event) 

188 return 

189 ctrl, controls = self.controls[event.id] 

190 value = None if evt.type == ControlType.BUTTON else ctrl.value 

191 for control in controls: 

192 control.valueChanged.emit(value) 

193 

194 def setState(self, state): 

195 self._state = state 

196 self.stateChanged.emit(state) 

197 

198 def state(self): 

199 return self._state 

200 

201 def start(self): 

202 if self._state != "stopped": 

203 raise RuntimeError(f"Cannot start when camera is {self._state}") 

204 self.setState("running") 

205 self.capture.open() 

206 self._stream = iter(self.capture) 

207 dispatcher.register(self, "all") 

208 

209 def pause(self): 

210 if self._state != "running": 

211 raise RuntimeError(f"Cannot pause when camera is {self._state}") 

212 dispatcher.register(self, "control") 

213 self.setState("paused") 

214 

215 def resume(self): 

216 if self._state != "paused": 

217 raise RuntimeError(f"Cannot resume when camera is {self._state}") 

218 dispatcher.register(self, "all") 

219 self.setState("running") 

220 

221 def stop(self): 

222 if self._state == "stopped": 

223 raise RuntimeError(f"Cannot stop when camera is {self._state}") 

224 dispatcher.register(self, "control") 

225 if self._stream is not None: 

226 self._stream.close() 

227 self._stream = None 

228 self.capture.close() 

229 self.setState("stopped") 

230 

231 def qcontrol(self, name_or_id: str | int): 

232 ctrl = self.device.controls[name_or_id] 

233 qctrl = QControl(ctrl) 

234 if (info := self.controls.get(ctrl.id)) is None: 

235 info = ctrl, [] 

236 self.controls[ctrl.id] = info 

237 info[1].append(qctrl) 

238 self.device.subscribe_event(EventType.CTRL, ctrl.id) 

239 return qctrl 

240 

241 

242class QVideoStream(QtCore.QObject): 

243 imageChanged = QtCore.Signal(object) 

244 

245 def __init__(self, camera: QCamera | None = None): 

246 super().__init__() 

247 self.camera = None 

248 self.set_camera(camera) 

249 

250 def on_frame(self, frame): 

251 qimage = frame.user_data or frame_to_qimage(frame) 

252 self.imageChanged.emit(qimage) 

253 

254 def set_camera(self, camera: QCamera | None = None): 

255 if self.camera: 

256 self.camera.frameChanged.disconnect(self.on_frame) 

257 self.camera = camera 

258 if self.camera: 

259 self.camera.frameChanged.connect(self.on_frame) 

260 

261 

262class QStrValidator(QtGui.QValidator): 

263 def __init__(self, ctrl): 

264 super().__init__() 

265 self.ctrl = ctrl 

266 

267 def validate(self, text, pos): 

268 size = len(text) 

269 if size < self.ctrl.minimum: 

270 return QtGui.QValidator.State.Intermediate, text, pos 

271 if size > self.ctrl.maximum: 

272 return QtGui.QValidator.State.Invalid, text, pos 

273 return QtGui.QValidator.State.Acceptable, text, pos 

274 

275 

276def hbox(*widgets): 

277 panel = QtWidgets.QWidget() 

278 layout = QtWidgets.QHBoxLayout(panel) 

279 layout.setContentsMargins(0, 0, 0, 0) 

280 for widget in widgets: 

281 layout.addWidget(widget) 

282 return panel 

283 

284 

285def _reset_control(control): 

286 reset_button = QtWidgets.QToolButton() 

287 reset_button.setIcon(QtGui.QIcon.fromTheme(QtGui.QIcon.ThemeIcon.EditClear)) 

288 reset_button.clicked.connect(control.ctrl.set_to_default) 

289 return reset_button 

290 

291 

292def menu_control(control): 

293 ctrl = control.ctrl 

294 combo = QtWidgets.QComboBox() 

295 idx_map = {} 

296 key_map = {} 

297 for idx, (key, name) in enumerate(ctrl.data.items()): 

298 combo.addItem(str(name), key) 

299 idx_map[idx] = key 

300 key_map[key] = idx 

301 combo.setCurrentIndex(key_map[ctrl.value]) 

302 control.valueChanged.connect(lambda key: combo.setCurrentIndex(key_map[key])) 

303 combo.textActivated.connect(lambda txt: control.setValue(idx_map[combo.currentIndex()])) 

304 reset_button = _reset_control(control) 

305 reset_button.clicked.connect(lambda: combo.setCurrentIndex(key_map[ctrl.default])) 

306 return hbox(combo, reset_button) 

307 

308 

309def text_control(control): 

310 ctrl = control.ctrl 

311 line_edit = QtWidgets.QLineEdit() 

312 validator = QStrValidator(ctrl) 

313 line_edit.setValidator(validator) 

314 line_edit.setText(ctrl.value) 

315 control.valueChanged.connect(line_edit.setText) 

316 line_edit.editingFinished.connect(lambda: control.setValue(line_edit.text())) 

317 reset_button = _reset_control(control) 

318 reset_button.clicked.connect(lambda: line_edit.setText(ctrl.default)) 

319 return hbox(line_edit, reset_button) 

320 

321 

322def bool_control(control): 

323 ctrl = control.ctrl 

324 widget = QtWidgets.QCheckBox() 

325 widget.setChecked(ctrl.value) 

326 control.valueChanged.connect(widget.setChecked) 

327 widget.clicked.connect(control.setValue) 

328 reset_button = _reset_control(control) 

329 reset_button.clicked.connect(lambda: widget.setChecked(ctrl.default)) 

330 return hbox(widget, reset_button) 

331 

332 

333def button_control(control): 

334 ctrl = control.ctrl 

335 widget = QtWidgets.QPushButton(ctrl.name) 

336 widget.clicked.connect(ctrl.push) 

337 control.valueChanged.connect(lambda _: log.info(f"Someone clicked {ctrl.name}")) 

338 return widget 

339 

340 

341def integer_control(control): 

342 ctrl = control.ctrl 

343 value = ctrl.value 

344 slider = QtWidgets.QSlider() 

345 slider.setOrientation(QtCore.Qt.Orientation.Horizontal) 

346 slider.setRange(ctrl.minimum, ctrl.maximum) 

347 slider.setValue(value) 

348 spin = QtWidgets.QSpinBox() 

349 spin.setRange(ctrl.minimum, ctrl.maximum) 

350 spin.setValue(value) 

351 

352 def on_slider_value(v): 

353 control.setValue(v) 

354 with signals_blocked(spin): 

355 spin.setValue(v) 

356 

357 def on_spin_value(v): 

358 control.setValue(v) 

359 with signals_blocked(slider): 

360 slider.setValue(v) 

361 

362 def on_ctrl_value(v): 

363 with signals_blocked(spin): 

364 spin.setValue(v) 

365 with signals_blocked(slider): 

366 slider.setValue(v) 

367 

368 control.valueChanged.connect(on_ctrl_value) 

369 slider.valueChanged.connect(on_slider_value) 

370 spin.valueChanged.connect(on_spin_value) 

371 reset_button = _reset_control(control) 

372 

373 def reset(): 

374 on_ctrl_value(ctrl.default) 

375 ctrl.set_to_default() 

376 

377 reset_button.clicked.connect(reset) 

378 return hbox(slider, spin, reset_button) 

379 

380 

381class QControl(QtCore.QObject): 

382 valueChanged = QtCore.Signal(object) 

383 

384 def __init__(self, ctrl: BaseControl): 

385 super().__init__() 

386 self.ctrl = ctrl 

387 

388 def setValue(self, value): 

389 log.info("set value %r to %s", self.ctrl.name, value) 

390 self.ctrl.value = value 

391 

392 def create_widget(self): 

393 ctrl = self.ctrl 

394 widget = None 

395 if ctrl.is_flagged_has_payload and ctrl.type != ControlType.STRING: 

396 return 

397 if ctrl.type in {ControlType.INTEGER, ControlType.U8, ControlType.U16, ControlType.U32}: 

398 widget = integer_control(self) 

399 elif ctrl.type == ControlType.INTEGER64: 

400 pass # TODO 

401 elif ctrl.type == ControlType.BOOLEAN: 

402 widget = bool_control(self) 

403 elif ctrl.type == ControlType.STRING: 

404 widget = text_control(self) 

405 elif ctrl.type in {ControlType.MENU, ControlType.INTEGER_MENU}: 

406 widget = menu_control(self) 

407 elif ctrl.type == ControlType.BUTTON: 

408 widget = button_control(self) 

409 if widget is not None: 

410 widget.setToolTip(ctrl.name) 

411 widget.setEnabled(ctrl.is_writeable) 

412 return widget 

413 

414 

415def control_group_widget(widgets): 

416 group = QtWidgets.QWidget() 

417 layout = QtWidgets.QFormLayout(group) 

418 for qctrl, widget in widgets: 

419 if qctrl.ctrl.type == ControlType.BUTTON: 

420 layout.addWidget(widget) 

421 else: 

422 layout.addRow(f"{qctrl.ctrl.name}:", widget) 

423 return group 

424 

425 

426def control_widgets(camera): 

427 widgets = collections.defaultdict(list) 

428 for ctrl_id, ctrl in camera.device.controls.items(): 

429 if ctrl.is_flagged_has_payload and ctrl.type != ControlType.STRING: 

430 continue 

431 qctrl = camera.qcontrol(ctrl_id) 

432 if (widget := qctrl.create_widget()) is None: 

433 continue 

434 if (klass := ctrl.control_class) is None: 

435 name = "Generic" 

436 else: 

437 name = klass.name.decode() 

438 widgets[name].append((qctrl, widget)) 

439 return widgets 

440 

441 

442class QControlPanel(QtWidgets.QTabWidget): 

443 def __init__(self, camera: QCamera): 

444 super().__init__() 

445 self.camera = camera 

446 self.setWindowTitle(f"{camera.device.info.card} @ {camera.device.filename}") 

447 self.fill() 

448 

449 def fill(self): 

450 group_widgets = control_widgets(self.camera) 

451 for name, widgets in group_widgets.items(): 

452 area = QtWidgets.QScrollArea() 

453 tab = control_group_widget(widgets) 

454 area.setWidget(tab) 

455 self.addTab(area, name) 

456 

457 

458def fill_info_panel(camera: QCamera, widget): 

459 device = camera.device 

460 info = device.info 

461 layout = QtWidgets.QFormLayout(widget) 

462 layout.addRow("Device:", QtWidgets.QLabel(str(device.filename))) 

463 layout.addRow("Card:", QtWidgets.QLabel(info.card)) 

464 layout.addRow("Driver:", QtWidgets.QLabel(info.driver)) 

465 layout.addRow("Bus:", QtWidgets.QLabel(info.bus_info)) 

466 layout.addRow("Version:", QtWidgets.QLabel(info.version)) 

467 

468 

469def fill_inputs_panel(camera: QCamera, widget): 

470 def on_camera_state(state): 

471 stopped = state == "stopped" 

472 inputs_combo.setEnabled(stopped) 

473 sizes_combo.setEnabled(stopped) 

474 width_spin.setEnabled(stopped) 

475 height_spin.setEnabled(stopped) 

476 fps_combo.setEnabled(stopped) 

477 pixfmt_combo.setEnabled(stopped) 

478 

479 def on_input(index): 

480 if index < 0: 

481 return 

482 device.set_input(inputs_combo.currentData()) 

483 update_sizes() 

484 update_fps() 

485 update_formats() 

486 

487 def on_frame_size(_): 

488 fmt = device.get_format(BufferType.VIDEO_CAPTURE) 

489 if sizes_combo.isEnabled(): 

490 if sizes_combo.currentIndex() < 0: 

491 return 

492 width, height = sizes_combo.currentData() 

493 else: 

494 width, height = width_spin.value(), height_spin.value() 

495 device.set_format(BufferType.VIDEO_CAPTURE, width, height, fmt.pixel_format) 

496 update_fps() 

497 

498 def on_fps(index): 

499 if index < 0: 

500 return 

501 device.set_fps(BufferType.VIDEO_CAPTURE, fps_combo.currentData()) 

502 

503 def on_format(index): 

504 if index < 0: 

505 return 

506 curr_fmt = device.get_format(BufferType.VIDEO_CAPTURE) 

507 pixel_format = pixfmt_combo.currentData() 

508 device.set_format(BufferType.VIDEO_CAPTURE, curr_fmt.width, curr_fmt.height, pixel_format) 

509 update_fps() 

510 

511 def update_input(): 

512 curr_input = device.get_input() 

513 inputs_combo.clear() 

514 for inp in info.inputs: 

515 inputs_combo.addItem(inp.name, inp.index) 

516 inputs_combo.setCurrentIndex(curr_input) 

517 

518 def update_formats(): 

519 curr_fmt = capture.get_format() 

520 formats = info.buffer_formats(BufferType.VIDEO_CAPTURE) 

521 pixfmt_combo.clear() 

522 for fmt in sorted(formats, key=lambda fmt: fmt.pixel_format.name): 

523 pixfmt_combo.addItem(fmt.pixel_format.name, fmt.pixel_format) 

524 pixfmt_combo.setCurrentText(curr_fmt.pixel_format.name) 

525 

526 def update_sizes(): 

527 curr_fmt = capture.get_format() 

528 sizes = info.format_frame_sizes(curr_fmt.pixel_format) 

529 continuous = len(sizes) == 1 and sizes[0].type != FrameSizeType.DISCRETE 

530 if continuous: 

531 size = sizes[0] 

532 width_spin.setRange(size.info.min_width, size.info.max_width) 

533 width_spin.setSingleStep(size.info.step_width) 

534 width_spin.setValue(curr_fmt.width) 

535 height_spin.setRange(size.info.min_height, size.info.max_height) 

536 height_spin.setSingleStep(size.info.step_height) 

537 height_spin.setValue(curr_fmt.height) 

538 else: 

539 sizes_combo.clear() 

540 sizes = {(size.info.width, size.info.height) for size in sizes} 

541 for size in sorted(sizes): 

542 width, height = size 

543 sizes_combo.addItem(f"{width}x{height}", (width, height)) 

544 sizes_combo.setCurrentText(f"{curr_fmt.width}x{curr_fmt.height}") 

545 layout.setRowVisible(width_spin, continuous) 

546 layout.setRowVisible(height_spin, continuous) 

547 layout.setRowVisible(sizes_combo, not continuous) 

548 

549 def update_fps(): 

550 curr_fmt = capture.get_format() 

551 opts = info.fps_intervals(curr_fmt.pixel_format, curr_fmt.width, curr_fmt.height) 

552 opts = (opt.min_fps for opt in opts if opt.min_fps == opt.max_fps) 

553 curr_fps = capture.get_fps() 

554 fps_combo.clear() 

555 for fps in sorted(opts): 

556 fps_combo.addItem(str(fps), fps) 

557 fps_combo.setCurrentText(str(curr_fps)) 

558 

559 camera.stateChanged.connect(on_camera_state) 

560 

561 device = camera.device 

562 capture = camera.capture 

563 info = device.info 

564 

565 layout = QtWidgets.QFormLayout(widget) 

566 inputs_combo = QtWidgets.QComboBox() 

567 layout.addRow("Input:", inputs_combo) 

568 inputs_combo.currentIndexChanged.connect(on_input) 

569 

570 sizes_combo = QtWidgets.QComboBox() 

571 sizes_combo.currentTextChanged.connect(on_frame_size) 

572 layout.addRow("Size:", sizes_combo) 

573 width_spin = QtWidgets.QSpinBox() 

574 height_spin = QtWidgets.QSpinBox() 

575 layout.addRow("Width:", width_spin) 

576 layout.addRow("Height:", height_spin) 

577 width_spin.valueChanged.connect(on_frame_size) 

578 height_spin.valueChanged.connect(on_frame_size) 

579 

580 fps_combo = QtWidgets.QComboBox() 

581 fps_combo.currentIndexChanged.connect(on_fps) 

582 layout.addRow("FPS:", fps_combo) 

583 

584 pixfmt_combo = QtWidgets.QComboBox() 

585 layout.addRow("Format:", pixfmt_combo) 

586 pixfmt_combo.currentIndexChanged.connect(on_format) 

587 

588 update_input() 

589 update_sizes() 

590 update_fps() 

591 update_formats() 

592 

593 

594class QSettingsPanel(QtWidgets.QWidget): 

595 def __init__(self, camera: QCamera): 

596 super().__init__() 

597 self.camera = camera 

598 self.fill() 

599 

600 def fill(self): 

601 layout = QtWidgets.QVBoxLayout(self) 

602 info = QtWidgets.QGroupBox("General Information") 

603 fill_info_panel(self.camera, info) 

604 layout.addWidget(info) 

605 if self.camera.device.info.inputs: 

606 inputs = QtWidgets.QGroupBox("Input Settings") 

607 fill_inputs_panel(self.camera, inputs) 

608 layout.addWidget(inputs) 

609 

610 

611def to_qpixelformat(pixel_format: PixelFormat) -> QtGui.QPixelFormat | None: 

612 if pixel_format == PixelFormat.YUYV: 

613 return QtGui.qPixelFormatYuv(QtGui.QPixelFormat.YUVLayout.YUYV) 

614 

615 

616FORMAT_MAP = { 

617 PixelFormat.RGB24: QtGui.QImage.Format.Format_RGB888, # ok 

618 PixelFormat.BGR24: QtGui.QImage.Format.Format_BGR888, # ok 

619 PixelFormat.RGB565: QtGui.QImage.Format.Format_RGB16, # ok 

620 PixelFormat.RGBX555: QtGui.QImage.Format.Format_RGB555, # ok ? Transparency 

621 PixelFormat.XRGB444: QtGui.QImage.Format.Format_RGB444, # ok 

622 PixelFormat.RGB32: QtGui.QImage.Format.Format_ARGB32, # wrong 

623 PixelFormat.RGBA32: QtGui.QImage.Format.Format_RGBA8888, # wrong no image! 

624 PixelFormat.ARGB32: QtGui.QImage.Format.Format_ARGB32, # wrong 

625 PixelFormat.XRGB32: QtGui.QImage.Format.Format_RGB32, # wrong (same problem as RGB32) 

626 PixelFormat.GREY: QtGui.QImage.Format.Format_Grayscale8, # ok 

627 PixelFormat.Y16: QtGui.QImage.Format.Format_Grayscale16, # ok 

628 PixelFormat.RGBX32: QtGui.QImage.Format.Format_RGBX8888, # ok 

629 PixelFormat.RGBX1010102: QtGui.QImage.Format.Format_RGB30, 

630} 

631 

632OPENCV_FORMATS = { 

633 PixelFormat.YUYV, 

634 PixelFormat.YVYU, 

635 PixelFormat.UYVY, 

636 PixelFormat.YUV420, 

637 PixelFormat.NV12, 

638 PixelFormat.NV21, 

639} 

640 

641 

642def create_opencv_buffer(format): 

643 import numpy 

644 

645 depth = 3 

646 dtype = numpy.ubyte 

647 return numpy.empty((format.height, format.width, depth), dtype=dtype) 

648 

649 

650def opencv_frame_to_rgb24(frame, into=None): 

651 import cv2 

652 

653 YUV_MAP = { 

654 PixelFormat.YUYV: cv2.COLOR_YUV2RGB_YUYV, 

655 PixelFormat.YVYU: cv2.COLOR_YUV2RGB_YVYU, 

656 PixelFormat.UYVY: cv2.COLOR_YUV2RGB_UYVY, 

657 PixelFormat.YUV420: cv2.COLOR_YUV2RGB_I420, 

658 PixelFormat.NV12: cv2.COLOR_YUV2RGB_NV12, 

659 PixelFormat.NV21: cv2.COLOR_YUV2RGB_NV21, 

660 } 

661 data = frame.array 

662 fmt = frame.pixel_format 

663 if fmt in {PixelFormat.NV12, PixelFormat.NV21, PixelFormat.YUV420}: 

664 data.shape = frame.height * 3 // 2, frame.width, -1 

665 else: 

666 data.shape = frame.height, frame.width, -1 

667 result = cv2.cvtColor(data, YUV_MAP[fmt], dst=into) 

668 return result 

669 

670 

671def frame_to_qimage(frame: Frame) -> QtGui.QImage | None: 

672 """Translates a Frame to a QImage""" 

673 data = frame.data 

674 if frame.pixel_format == PixelFormat.MJPEG: 

675 return QtGui.QImage.fromData(data, "JPG") 

676 

677 if frame.pixel_format in OPENCV_FORMATS: 

678 data = opencv_frame_to_rgb24(frame) 

679 fmt = QtGui.QImage.Format.Format_RGB888 

680 else: 

681 if (fmt := FORMAT_MAP.get(frame.pixel_format)) is None: 

682 return None 

683 return QtGui.QImage(data, frame.width, frame.height, fmt) 

684 

685 

686def frame_to_qpixmap(frame: Frame) -> QtGui.QPixmap: 

687 if frame.pixel_format == PixelFormat.MJPEG: 

688 pixmap = QtGui.QPixmap(frame.width, frame.height) 

689 pixmap.loadFromData(frame.data, "JPG") 

690 return pixmap 

691 qimage = frame_to_qimage(frame) 

692 return QtGui.QPixmap.fromImage(qimage) 

693 

694 

695def draw_no_image_rect(painter, rect, line_width=4): 

696 color = QtGui.QColor(255, 0, 0, 100) 

697 pen = QtGui.QPen(color, line_width) 

698 painter.setPen(pen) 

699 painter.setBrush(QtCore.Qt.NoBrush) 

700 painter.drawLines( 

701 ( 

702 QtCore.QLineF(rect.topLeft(), rect.bottomRight()), 

703 QtCore.QLineF(rect.bottomLeft(), rect.topRight()), 

704 ) 

705 ) 

706 half_line_width = line_width // 2 

707 rect.setLeft(rect.left() + half_line_width) 

708 rect.setRight(rect.right() - half_line_width) 

709 rect.setTop(rect.top() + half_line_width) 

710 rect.setBottom(rect.bottom() - half_line_width) 

711 painter.drawRect(rect) 

712 

713 

714def draw_no_image(painter, width, height, line_width=4): 

715 rect = QtCore.QRectF(0, 0, width, height) 

716 return draw_no_image_rect(painter, rect, line_width) 

717 

718 

719class BaseCameraControl: 

720 def __init__(self, camera: QCamera | None = None): 

721 self.camera = None 

722 self.set_camera(camera) 

723 

724 def set_camera(self, camera: QCamera | None): 

725 if self.camera: 

726 self.camera.stateChanged.disconnect(self.on_camera_state_changed) 

727 self.camera = camera 

728 if self.camera: 

729 self.camera.stateChanged.connect(self.on_camera_state_changed) 

730 state = self.camera.state() 

731 else: 

732 state = None 

733 self.on_camera_state_changed(state) 

734 

735 def on_camera_state_changed(self, state): 

736 pass 

737 

738 

739class PlayButtonControl(BaseCameraControl): 

740 play_icon = "media-playback-start" 

741 stop_icon = "media-playback-stop" 

742 

743 def __init__(self, button, camera: QCamera | None = None): 

744 self.button = button 

745 self.button.clicked.connect(self.on_click) 

746 super().__init__(camera) 

747 

748 def on_camera_state_changed(self, state): 

749 enabled = True 

750 icon = self.stop_icon 

751 if state is None: 

752 enabled = False 

753 tip = "No camera attached to this button" 

754 elif state == "stopped": 

755 icon = self.play_icon 

756 tip = "Camera is stopped. Press to start rolling" 

757 else: 

758 tip = f"Camera is {state}. Press to stop it" 

759 self.button.setEnabled(enabled) 

760 self.button.setIcon(QtGui.QIcon.fromTheme(icon)) 

761 self.button.setToolTip(tip) 

762 

763 def on_click(self): 

764 if self.camera: 

765 if self.camera.state() == "stopped": 

766 self.camera.start() 

767 else: 

768 self.camera.stop() 

769 

770 

771class PauseButtonControl(BaseCameraControl): 

772 play_icon = "media-playback-start" 

773 pause_icon = "media-playback-pause" 

774 

775 def __init__(self, button, camera: QCamera | None = None): 

776 self.button = button 

777 self.button.clicked.connect(self.on_click) 

778 super().__init__(camera) 

779 

780 def on_camera_state_changed(self, state): 

781 if state is None: 

782 tip = "No camera attached to this button" 

783 elif state == "stopped": 

784 tip = "Camera is stopped" 

785 elif state == "paused": 

786 tip = "Camera is paused. Press to resume" 

787 else: 

788 tip = f"Camera is {state}. Press to pause" 

789 enabled = state not in {None, "stopped"} 

790 icon = self.play_icon if state == "paused" else self.pause_icon 

791 self.button.setEnabled(enabled) 

792 self.button.setIcon(QtGui.QIcon.fromTheme(icon)) 

793 self.button.setToolTip(tip) 

794 

795 def on_click(self): 

796 if self.camera is None: 

797 return 

798 if self.camera.state() == "running": 

799 self.camera.pause() 

800 else: 

801 self.camera.resume() 

802 

803 

804class StateControl(BaseCameraControl): 

805 def __init__(self, label, camera: QCamera | None = None): 

806 self.label = label 

807 super().__init__(camera) 

808 

809 def on_camera_state_changed(self, state): 

810 if state is None: 

811 state = "---" 

812 self.label.setText(state) 

813 

814 

815class QVideoControls(QtWidgets.QWidget): 

816 def __init__(self, camera: QCamera | None = None): 

817 super().__init__() 

818 self.camera = None 

819 self._init() 

820 self.set_camera(camera) 

821 

822 def _init(self): 

823 layout = QtWidgets.QHBoxLayout() 

824 layout.setContentsMargins(0, 0, 0, 0) 

825 self.setLayout(layout) 

826 state_label = QtWidgets.QLabel("") 

827 play_button = QtWidgets.QToolButton() 

828 pause_button = QtWidgets.QToolButton() 

829 self.camera_label = QtWidgets.QLabel("") 

830 layout.addWidget(self.camera_label) 

831 layout.addStretch(1) 

832 layout.addWidget(state_label) 

833 layout.addWidget(play_button) 

834 layout.addWidget(pause_button) 

835 self.state_control = StateControl(state_label) 

836 self.play_control = PlayButtonControl(play_button) 

837 self.pause_control = PauseButtonControl(pause_button) 

838 

839 def set_camera(self, camera: QCamera | None = None): 

840 self.state_control.set_camera(camera) 

841 self.play_control.set_camera(camera) 

842 self.pause_control.set_camera(camera) 

843 if camera is None: 

844 name = "---" 

845 else: 

846 name = camera.device.filename.stem 

847 self.camera_label.setText(name) 

848 self.camera = camera 

849 

850 

851def paint_image(painter, width, height, qimage=None): 

852 if qimage is None: 

853 draw_no_image(painter, width, height) 

854 return 

855 image_width = qimage.width() 

856 image_height = qimage.height() 

857 scale = min(width / image_width, height / image_height) 

858 rect_width = int(image_width * scale) 

859 rect_height = int(image_height * scale) 

860 x = int((width - rect_width) / 2) 

861 y = int((height - rect_height) / 2) 

862 rect = QtCore.QRect(x, y, rect_width, rect_height) 

863 painter.drawImage(rect, qimage) 

864 

865 

866class VideoMixin: 

867 qimage = None 

868 

869 def imageRect(self): 

870 return self.rect() 

871 

872 def paint(self, painter, _, *args): 

873 rect = self.imageRect() 

874 paint_image(painter, rect.width(), rect.height(), self.qimage) 

875 

876 def on_image_changed(self, qimage): 

877 self.qimage = qimage 

878 self.update() 

879 

880 

881class QVideo(VideoMixin, QtWidgets.QWidget): 

882 def paintEvent(self, _): 

883 self.paint(QtGui.QPainter(self), None) 

884 

885 def minimumSizeHint(self): 

886 return QtCore.QSize(160, 120) 

887 

888 

889class QVideoItem(VideoMixin, QtWidgets.QGraphicsItem): 

890 def boundingRect(self): 

891 width, height = 640, 480 

892 if self.qimage: 

893 width, height = self.qimage.width(), self.qimage.height() 

894 return QtCore.QRectF(0, 0, width, height) 

895 

896 imageRect = boundingRect 

897 

898 

899class QVideoWidget(QtWidgets.QWidget): 

900 def __init__(self, camera=None): 

901 super().__init__() 

902 layout = QtWidgets.QVBoxLayout(self) 

903 self.video = QVideo() 

904 self.stream = QVideoStream(camera) 

905 self.stream.imageChanged.connect(self.video.on_image_changed) 

906 self.controls = QVideoControls(camera) 

907 layout.addWidget(self.video) 

908 layout.addWidget(self.controls) 

909 layout.setStretchFactor(self.video, 1) 

910 

911 def set_camera(self, camera: QCamera | None = None): 

912 self.stream.set_camera(camera) 

913 self.controls.set_camera(camera) 

914 

915 

916def main(): 

917 import argparse 

918 

919 def stop(): 

920 if camera.state() != "stopped": 

921 camera.stop() 

922 

923 parser = argparse.ArgumentParser() 

924 parser.add_argument("--log-level", choices=["debug", "info", "warning", "error"], default="info") 

925 parser.add_argument("device", type=int) 

926 args = parser.parse_args() 

927 fmt = "%(threadName)-10s %(asctime)-15s %(levelname)-5s %(name)s: %(message)s" 

928 logging.basicConfig(level=args.log_level.upper(), format=fmt) 

929 app = QtWidgets.QApplication([]) 

930 with QCamera.from_id(args.device, blocking=False) as camera: 

931 window = QtWidgets.QWidget() 

932 layout = QtWidgets.QHBoxLayout(window) 

933 video = QVideoWidget(camera) 

934 video.setMinimumSize(640, 480) 

935 panel = QControlPanel(camera) 

936 settings = QSettingsPanel(camera) 

937 layout.addWidget(settings) 

938 layout.addWidget(video) 

939 layout.addWidget(panel) 

940 layout.setStretchFactor(video, 1) 

941 window.show() 

942 app.aboutToQuit.connect(stop) 

943 app.exec() 

944 

945 

946if __name__ == "__main__": 

947 main()