Coverage for linuxpy/video/device.py: 82%

1564 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2026-02-19 15:11 +0100

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7"""Human friendly interface to V4L2 (Video 4 Linux 2) subsystem.""" 

8 

9import asyncio 

10import collections 

11import contextlib 

12import copy 

13import ctypes 

14import enum 

15import errno 

16import fractions 

17import logging 

18import mmap 

19import os 

20import select 

21import time 

22from collections import UserDict 

23from pathlib import Path 

24 

25from linuxpy.ctypes import cast, cenum, create_string_buffer, memcpy, string_at 

26from linuxpy.device import ( 

27 BaseDevice, 

28 ReentrantOpen, 

29 iter_device_files, 

30) 

31from linuxpy.io import IO 

32from linuxpy.ioctl import ioctl 

33from linuxpy.types import AsyncIterator, Buffer, Callable, Iterable, Iterator, Optional, PathLike, Self, Union 

34from linuxpy.util import astream, bit_indexes, make_find, to_fd 

35 

36from . import raw 

37 

38log = logging.getLogger(__name__) 

39log_mmap = log.getChild("mmap") 

40 

41 

42class V4L2Error(Exception): 

43 """Video for linux 2 error""" 

44 

45 

46def _enum(name, prefix, klass=enum.IntEnum): 

47 return klass( 

48 name, 

49 ((name.replace(prefix, ""), getattr(raw, name)) for name in dir(raw) if name.startswith(prefix)), 

50 ) 

51 

52 

53FrameSizeType = raw.Frmsizetypes 

54FrameIntervalType = raw.Frmivaltypes 

55Field = raw.Field 

56ImageFormatFlag = raw.ImageFormatFlag 

57Capability = raw.Capability 

58ControlID = raw.ControlID 

59ControlFlag = raw.ControlFlag 

60ControlType = raw.CtrlType 

61ControlClass = raw.ControlClass 

62SelectionTarget = raw.SelectionTarget 

63EventType = raw.EventType 

64EventControlChange = raw.EventControlChange 

65IOC = raw.IOC 

66BufferType = raw.BufType 

67BufferFlag = raw.BufferFlag 

68InputType = raw.InputType 

69PixelFormat = raw.PixelFormat 

70MetaFormat = raw.MetaFormat 

71Memory = raw.Memory 

72InputStatus = raw.InputStatus 

73OutputType = raw.OutputType 

74InputCapabilities = raw.InputCapabilities 

75OutputCapabilities = raw.OutputCapabilities 

76Priority = raw.Priority 

77TimeCodeType = raw.TimeCodeType 

78TimeCodeFlag = raw.TimeCodeFlag 

79EventSubscriptionFlag = raw.EventSubscriptionFlag 

80StandardID = raw.StandardID 

81 

82 

83def V4L2_CTRL_ID2CLASS(id_): 

84 return id_ & 0x0FFF0000 # unsigned long 

85 

86 

87def human_pixel_format(ifmt): 

88 return "".join(map(chr, ((ifmt >> i) & 0xFF for i in range(0, 4 * 8, 8)))) 

89 

90 

91PixelFormat.human_str = lambda self: human_pixel_format(self.value) 

92MetaFormat.human_str = lambda self: human_pixel_format(self.value) 

93 

94 

95ImageFormat = collections.namedtuple("ImageFormat", "type description flags pixel_format") 

96 

97MetaFmt = collections.namedtuple("MetaFmt", "format max_buffer_size width height bytes_per_line") 

98 

99Format = collections.namedtuple("Format", "width height pixel_format size bytes_per_line") 

100 

101CropCapability = collections.namedtuple("CropCapability", "type bounds defrect pixel_aspect") 

102 

103Rect = collections.namedtuple("Rect", "left top width height") 

104 

105Size = collections.namedtuple("Size", "width height") 

106 

107FrameType = collections.namedtuple("FrameType", "type pixel_format width height min_fps max_fps step_fps") 

108 

109Input = collections.namedtuple("InputType", "index name type audioset tuner std status capabilities") 

110 

111Output = collections.namedtuple("OutputType", "index name type audioset modulator std capabilities") 

112 

113Standard = collections.namedtuple("Standard", "index id name frameperiod framelines") 

114 

115FrameSize = collections.namedtuple("FrameSize", "index pixel_format type info") 

116 

117 

118CROP_BUFFER_TYPES = { 

119 BufferType.VIDEO_CAPTURE, 

120 BufferType.VIDEO_CAPTURE_MPLANE, 

121 BufferType.VIDEO_OUTPUT, 

122 BufferType.VIDEO_OUTPUT_MPLANE, 

123 BufferType.VIDEO_OVERLAY, 

124} 

125 

126IMAGE_FORMAT_BUFFER_TYPES = { 

127 BufferType.VIDEO_CAPTURE, 

128 BufferType.VIDEO_CAPTURE_MPLANE, 

129 BufferType.VIDEO_OUTPUT, 

130 BufferType.VIDEO_OUTPUT_MPLANE, 

131 BufferType.VIDEO_OVERLAY, 

132 BufferType.META_CAPTURE, 

133 BufferType.META_OUTPUT, 

134} 

135 

136 

137def mem_map(fd, length, offset): 

138 log_mmap.debug("%s, length=%d, offset=%d", fd, length, offset) 

139 return mmap.mmap(to_fd(fd), length, offset=offset) 

140 

141 

142def flag_items(flag): 

143 return [item for item in type(flag) if item in flag] 

144 

145 

146def raw_crop_caps_to_crop_caps(crop): 

147 return CropCapability( 

148 type=BufferType(crop.type), 

149 bounds=Rect( 

150 crop.bounds.left, 

151 crop.bounds.top, 

152 crop.bounds.width, 

153 crop.bounds.height, 

154 ), 

155 defrect=Rect( 

156 crop.defrect.left, 

157 crop.defrect.top, 

158 crop.defrect.width, 

159 crop.defrect.height, 

160 ), 

161 pixel_aspect=crop.pixelaspect.numerator / crop.pixelaspect.denominator, 

162 ) 

163 

164 

165CropCapability.from_raw = raw_crop_caps_to_crop_caps 

166 

167 

168def raw_read_crop_capabilities(fd, buffer_type: BufferType) -> raw.v4l2_cropcap: 

169 crop = raw.v4l2_cropcap() 

170 crop.type = buffer_type 

171 return ioctl(fd, IOC.CROPCAP, crop) 

172 

173 

174def read_crop_capabilities(fd, buffer_type: BufferType) -> CropCapability: 

175 try: 

176 crop = raw_read_crop_capabilities(fd, buffer_type) 

177 except OSError as error: 

178 if error.errno == errno.ENODATA: 

179 return None 

180 raise 

181 return raw_crop_caps_to_crop_caps(crop) 

182 

183 

184ITER_BREAK = (errno.ENOTTY, errno.ENODATA, errno.EPIPE) 

185 

186 

187def iter_read(fd, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval=False): 

188 for index in range(start, stop, step): 

189 indexed_struct.index = index 

190 try: 

191 ioctl(fd, ioc, indexed_struct) 

192 yield indexed_struct 

193 except OSError as error: 

194 if error.errno == errno.EINVAL: 

195 if ignore_einval: 

196 continue 

197 else: 

198 break 

199 elif error.errno in ITER_BREAK: 

200 break 

201 else: 

202 raise 

203 

204 

205def iter_read_frame_intervals(fd, fmt, w, h): 

206 value = raw.v4l2_frmivalenum() 

207 value.pixel_format = fmt 

208 value.width = w 

209 value.height = h 

210 count = 0 

211 for val in iter_read(fd, IOC.ENUM_FRAMEINTERVALS, value): 

212 # values come in frame interval (fps = 1/interval) 

213 try: 

214 ftype = FrameIntervalType(val.type) 

215 except ValueError: 

216 break 

217 if ftype == FrameIntervalType.DISCRETE: 

218 min_fps = max_fps = step_fps = fractions.Fraction(val.discrete.denominator / val.discrete.numerator) 

219 else: 

220 if val.stepwise.min.numerator == 0: 

221 min_fps = 0 

222 else: 

223 min_fps = fractions.Fraction(val.stepwise.min.denominator, val.stepwise.min.numerator) 

224 if val.stepwise.max.numerator == 0: 

225 max_fps = 0 

226 else: 

227 max_fps = fractions.Fraction(val.stepwise.max.denominator, val.stepwise.max.numerator) 

228 if val.stepwise.step.numerator == 0: 

229 step_fps = 0 

230 else: 

231 step_fps = fractions.Fraction(val.stepwise.step.denominator, val.stepwise.step.numerator) 

232 yield FrameType( 

233 type=ftype, 

234 pixel_format=fmt, 

235 width=w, 

236 height=h, 

237 min_fps=min_fps, 

238 max_fps=max_fps, 

239 step_fps=step_fps, 

240 ) 

241 count += 1 

242 if not count: 

243 # If it wasn't possible to get frame interval, report discovered frame size anyway 

244 yield FrameType( 

245 type=FrameIntervalType.DISCRETE, 

246 pixel_format=fmt, 

247 width=w, 

248 height=h, 

249 min_fps=0, 

250 max_fps=0, 

251 step_fps=0, 

252 ) 

253 

254 

255def iter_read_frame_sizes(fd, pixel_format): 

256 size = raw.v4l2_frmsizeenum() 

257 size.index = 0 

258 size.pixel_format = pixel_format 

259 for val in iter_read(fd, IOC.ENUM_FRAMESIZES, size): 

260 type = FrameSizeType(val.type) 

261 if type == FrameSizeType.DISCRETE: 

262 info = val.m1.discrete 

263 else: 

264 info = val.m1.stepwise 

265 yield FrameSize(val.index, PixelFormat(val.pixel_format), FrameSizeType(val.type), copy.copy(info)) 

266 

267 

268def iter_read_pixel_formats_frame_intervals(fd, pixel_formats): 

269 for pixel_format in pixel_formats: 

270 for size in iter_read_frame_sizes(fd, pixel_format): 

271 if size.type == FrameSizeType.DISCRETE: 

272 yield from iter_read_frame_intervals(fd, pixel_format, size.info.width, size.info.height) 

273 

274 

275def read_capabilities(fd): 

276 caps = raw.v4l2_capability() 

277 ioctl(fd, IOC.QUERYCAP, caps) 

278 return caps 

279 

280 

281def iter_read_formats(fd, type): 

282 format = raw.v4l2_fmtdesc() 

283 format.type = type 

284 pixel_formats = set(PixelFormat) 

285 meta_formats = set(MetaFormat) 

286 for fmt in iter_read(fd, IOC.ENUM_FMT, format): 

287 pixel_fmt = fmt.pixelformat 

288 if type in {BufferType.VIDEO_CAPTURE, BufferType.VIDEO_OUTPUT}: 

289 if pixel_fmt not in pixel_formats: 

290 log.warning( 

291 "ignored unknown pixel format %s (%d)", 

292 human_pixel_format(pixel_fmt), 

293 pixel_fmt, 

294 ) 

295 continue 

296 pixel_format = PixelFormat(pixel_fmt) 

297 elif type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}: 

298 if pixel_fmt not in meta_formats: 

299 log.warning( 

300 "ignored unknown meta format %s (%d)", 

301 human_pixel_format(pixel_fmt), 

302 pixel_fmt, 

303 ) 

304 continue 

305 pixel_format = MetaFormat(pixel_fmt) 

306 image_format = ImageFormat( 

307 type=type, 

308 flags=ImageFormatFlag(fmt.flags), 

309 description=fmt.description.decode(), 

310 pixel_format=pixel_format, 

311 ) 

312 yield image_format 

313 

314 

315def iter_read_inputs(fd): 

316 input = raw.v4l2_input() 

317 for inp in iter_read(fd, IOC.ENUMINPUT, input): 

318 input_type = Input( 

319 index=inp.index, 

320 name=inp.name.decode(), 

321 type=InputType(inp.type), 

322 audioset=bit_indexes(inp.audioset), 

323 tuner=inp.tuner, 

324 std=StandardID(inp.std), 

325 status=InputStatus(inp.status), 

326 capabilities=InputCapabilities(inp.capabilities), 

327 ) 

328 yield input_type 

329 

330 

331def iter_read_outputs(fd): 

332 output = raw.v4l2_output() 

333 for out in iter_read(fd, IOC.ENUMOUTPUT, output): 

334 output_type = Output( 

335 index=out.index, 

336 name=out.name.decode(), 

337 type=OutputType(out.type), 

338 audioset=bit_indexes(out.audioset), 

339 modulator=out.modulator, 

340 std=StandardID(out.std), 

341 capabilities=OutputCapabilities(out.capabilities), 

342 ) 

343 yield output_type 

344 

345 

346def iter_read_video_standards(fd): 

347 std = raw.v4l2_standard() 

348 for item in iter_read(fd, IOC.ENUMSTD, std): 

349 period = item.frameperiod 

350 yield Standard( 

351 index=item.index, 

352 id=StandardID(item.id), 

353 name=item.name.decode(), 

354 frameperiod=fractions.Fraction(period.denominator, period.numerator), 

355 framelines=item.framelines, 

356 ) 

357 

358 

359def iter_read_controls(fd): 

360 ctrl = raw.v4l2_query_ext_ctrl() 

361 nxt = ControlFlag.NEXT_CTRL | ControlFlag.NEXT_COMPOUND 

362 ctrl.id = nxt 

363 for ctrl_ext in iter_read(fd, IOC.QUERY_EXT_CTRL, ctrl): 

364 yield copy.deepcopy(ctrl_ext) 

365 ctrl_ext.id |= nxt 

366 

367 

368def iter_read_menu(fd, ctrl): 

369 qmenu = raw.v4l2_querymenu() 

370 qmenu.id = ctrl.id 

371 for menu in iter_read( 

372 fd, 

373 IOC.QUERYMENU, 

374 qmenu, 

375 start=ctrl._info.minimum, 

376 stop=ctrl._info.maximum + 1, 

377 step=ctrl._info.step, 

378 ignore_einval=True, 

379 ): 

380 yield copy.deepcopy(menu) 

381 

382 

383def query_buffer(fd, buffer_type: BufferType, memory: Memory, index: int) -> raw.v4l2_buffer: 

384 buff = raw.v4l2_buffer() 

385 buff.type = buffer_type 

386 buff.memory = memory 

387 buff.index = index 

388 buff.reserved = 0 

389 ioctl(fd, IOC.QUERYBUF, buff) 

390 return buff 

391 

392 

393def enqueue_buffer_raw(fd, buff: raw.v4l2_buffer) -> raw.v4l2_buffer: 

394 if not buff.timestamp.secs: 

395 buff.timestamp.set_ns() 

396 ioctl(fd, IOC.QBUF, buff) 

397 return buff 

398 

399 

400def enqueue_buffer(fd, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: 

401 buff = raw.v4l2_buffer() 

402 buff.type = buffer_type 

403 buff.memory = memory 

404 buff.bytesused = size 

405 buff.index = index 

406 buff.field = Field.NONE 

407 buff.reserved = 0 

408 return enqueue_buffer_raw(fd, buff) 

409 

410 

411def dequeue_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: 

412 buff = raw.v4l2_buffer() 

413 buff.type = buffer_type 

414 buff.memory = memory 

415 buff.index = 0 

416 buff.reserved = 0 

417 ioctl(fd, IOC.DQBUF, buff) 

418 return buff 

419 

420 

421def request_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> raw.v4l2_requestbuffers: 

422 req = raw.v4l2_requestbuffers() 

423 req.type = buffer_type 

424 req.memory = memory 

425 req.count = count 

426 ioctl(fd, IOC.REQBUFS, req) 

427 if not req.count: 

428 raise OSError("Not enough buffer memory") 

429 return req 

430 

431 

432def free_buffers(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_requestbuffers: 

433 req = raw.v4l2_requestbuffers() 

434 req.type = buffer_type 

435 req.memory = memory 

436 req.count = 0 

437 ioctl(fd, IOC.REQBUFS, req) 

438 return req 

439 

440 

441def export_buffer(fd, buffer_type: BufferType, index: int) -> int: 

442 req = raw.v4l2_exportbuffer(type=buffer_type, index=index) 

443 return ioctl(fd, IOC.EXPBUF, req).fd 

444 

445 

446def create_buffers(fd, format: raw.v4l2_format, memory: Memory, count: int) -> raw.v4l2_create_buffers: 

447 """Create buffers for Memory Mapped or User Pointer or DMA Buffer I/O""" 

448 req = raw.v4l2_create_buffers() 

449 req.format = format 

450 req.memory = memory 

451 req.count = count 

452 ioctl(fd, IOC.CREATE_BUFS, req) 

453 if not req.count: 

454 raise OSError("Not enough buffer memory") 

455 return req 

456 

457 

458def set_raw_format(fd, fmt: raw.v4l2_format): 

459 return ioctl(fd, IOC.S_FMT, fmt) 

460 

461 

462def set_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): 

463 fmt = raw.v4l2_format() 

464 if isinstance(pixel_format, str): 

465 pixel_format = raw.v4l2_fourcc(*pixel_format) 

466 fmt.type = buffer_type 

467 fmt.fmt.pix.pixelformat = pixel_format 

468 fmt.fmt.pix.field = Field.ANY 

469 fmt.fmt.pix.width = width 

470 fmt.fmt.pix.height = height 

471 fmt.fmt.pix.bytesperline = 0 

472 fmt.fmt.pix.sizeimage = 0 

473 return set_raw_format(fd, fmt) 

474 

475 

476def get_raw_format(fd, buffer_type) -> raw.v4l2_format: 

477 fmt = raw.v4l2_format() 

478 fmt.type = buffer_type 

479 ioctl(fd, IOC.G_FMT, fmt) 

480 return fmt 

481 

482 

483def get_format(fd, buffer_type) -> Union[Format, MetaFmt]: 

484 f = get_raw_format(fd, buffer_type) 

485 if buffer_type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}: 

486 return MetaFmt( 

487 format=MetaFormat(f.fmt.meta.dataformat), 

488 max_buffer_size=f.fmt.meta.buffersize, 

489 width=f.fmt.meta.width, 

490 height=f.fmt.meta.height, 

491 bytes_per_line=f.fmt.meta.bytesperline, 

492 ) 

493 return Format( 

494 width=f.fmt.pix.width, 

495 height=f.fmt.pix.height, 

496 pixel_format=PixelFormat(f.fmt.pix.pixelformat), 

497 size=f.fmt.pix.sizeimage, 

498 bytes_per_line=f.fmt.pix.bytesperline, 

499 ) 

500 

501 

502def try_raw_format(fd, fmt: raw.v4l2_format): 

503 ioctl(fd, IOC.TRY_FMT, fmt) 

504 

505 

506def try_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): 

507 fmt = raw.v4l2_format() 

508 if isinstance(pixel_format, str): 

509 pixel_format = raw.v4l2_fourcc(*pixel_format) 

510 fmt.type = buffer_type 

511 fmt.fmt.pix.pixelformat = pixel_format 

512 fmt.fmt.pix.field = Field.ANY 

513 fmt.fmt.pix.width = width 

514 fmt.fmt.pix.height = height 

515 fmt.fmt.pix.bytesperline = 0 

516 fmt.fmt.pix.sizeimage = 0 

517 return try_raw_format(fd, fmt) 

518 

519 

520def get_parm(fd, buffer_type): 

521 p = raw.v4l2_streamparm() 

522 p.type = buffer_type 

523 ioctl(fd, IOC.G_PARM, p) 

524 return p 

525 

526 

527def set_fps(fd, buffer_type, fps): 

528 # v4l2 fraction is u32 

529 max_denominator = int(min(2**32, 2**32 / fps)) 

530 p = raw.v4l2_streamparm() 

531 p.type = buffer_type 

532 fps = fractions.Fraction(fps).limit_denominator(max_denominator) 

533 if buffer_type == BufferType.VIDEO_CAPTURE: 

534 p.parm.capture.timeperframe.numerator = fps.denominator 

535 p.parm.capture.timeperframe.denominator = fps.numerator 

536 elif buffer_type == BufferType.VIDEO_OUTPUT: 

537 p.parm.output.timeperframe.numerator = fps.denominator 

538 p.parm.output.timeperframe.denominator = fps.numerator 

539 else: 

540 raise ValueError(f"Unsupported buffer type {buffer_type!r}") 

541 return ioctl(fd, IOC.S_PARM, p) 

542 

543 

544def get_fps(fd, buffer_type): 

545 p = get_parm(fd, buffer_type) 

546 if buffer_type == BufferType.VIDEO_CAPTURE: 

547 parm = p.parm.capture 

548 elif buffer_type == BufferType.VIDEO_OUTPUT: 

549 parm = p.parm.output 

550 else: 

551 raise ValueError(f"Unsupported buffer type {buffer_type!r}") 

552 return fractions.Fraction(parm.timeperframe.denominator, parm.timeperframe.numerator) 

553 

554 

555def stream_on(fd, buffer_type): 

556 btype = cenum(buffer_type) 

557 return ioctl(fd, IOC.STREAMON, btype) 

558 

559 

560def stream_off(fd, buffer_type): 

561 btype = cenum(buffer_type) 

562 return ioctl(fd, IOC.STREAMOFF, btype) 

563 

564 

565def set_selection(fd, buffer_type, target, rectangle): 

566 sel = raw.v4l2_selection() 

567 sel.type = buffer_type 

568 sel.target = target 

569 sel.r.left = rectangle.left 

570 sel.r.top = rectangle.top 

571 sel.r.width = rectangle.width 

572 sel.r.height = rectangle.height 

573 ioctl(fd, IOC.S_SELECTION, sel) 

574 

575 

576def get_selection( 

577 fd, 

578 buffer_type: BufferType, 

579 target: SelectionTarget = SelectionTarget.CROP, 

580) -> Rect: 

581 sel = raw.v4l2_selection() 

582 sel.type = buffer_type 

583 sel.target = target 

584 ioctl(fd, IOC.G_SELECTION, sel) 

585 return Rect(left=sel.r.left, top=sel.r.top, width=sel.r.width, height=sel.r.height) 

586 

587 

588def get_control(fd, id): 

589 control = raw.v4l2_control(id) 

590 ioctl(fd, IOC.G_CTRL, control) 

591 return control.value 

592 

593 

594CTRL_TYPE_CTYPE_ARRAY = { 

595 ControlType.U8: ctypes.c_uint8, 

596 ControlType.U16: ctypes.c_uint16, 

597 ControlType.U32: ctypes.c_uint32, 

598 ControlType.INTEGER: ctypes.c_int, 

599 ControlType.INTEGER64: ctypes.c_int64, 

600} 

601 

602 

603CTRL_TYPE_CTYPE_STRUCT = { 

604 # ControlType.AREA: raw.v4l2_area, 

605} 

606 

607 

608def _struct_for_ctrl_type(ctrl_type): 

609 ctrl_type = ControlType(ctrl_type).name.lower() 

610 name = f"v4l2_ctrl_{ctrl_type}" 

611 try: 

612 return getattr(raw, name) 

613 except AttributeError: 

614 name = f"v4l2_{ctrl_type}" 

615 return getattr(raw, name) 

616 

617 

618def get_ctrl_type_struct(ctrl_type): 

619 struct = CTRL_TYPE_CTYPE_STRUCT.get(ctrl_type) 

620 if struct is None: 

621 struct = _struct_for_ctrl_type(ctrl_type) 

622 CTRL_TYPE_CTYPE_STRUCT[ctrl_type] = struct 

623 return struct 

624 

625 

626def convert_to_ctypes_array(lst, depth, ctype): 

627 """Convert a list (arbitrary depth) to a ctypes array.""" 

628 if depth == 1: 

629 return (ctype * len(lst))(*lst) 

630 

631 # Recursive case: we need to process the sub-lists first 

632 sub_arrays = [convert_to_ctypes_array(sub_lst, depth - 1, ctype) for sub_lst in lst] 

633 array_type = len(sub_arrays) * type(sub_arrays[0]) # Create the array type 

634 return array_type(*sub_arrays) 

635 

636 

637def _prepare_read_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control): 

638 raw_control.id = control.id 

639 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags) 

640 if has_payload: 

641 if control.type == ControlType.STRING: 

642 size = control.maximum + 1 

643 payload = ctypes.create_string_buffer(size) 

644 raw_control.string = payload 

645 raw_control.size = size 

646 else: 

647 ctype = CTRL_TYPE_CTYPE_ARRAY.get(control.type) 

648 raw_control.size = control.elem_size * control.elems 

649 if ctype is None: 

650 ctype = get_ctrl_type_struct(control.type) 

651 payload = ctype() 

652 raw_control.ptr = ctypes.cast(ctypes.pointer(payload), ctypes.c_void_p) 

653 else: 

654 for i in range(control.nr_of_dims): 

655 ctype *= control.dims[i] 

656 payload = ctype() 

657 raw_control.size = control.elem_size * control.elems 

658 raw_control.ptr = ctypes.cast(payload, ctypes.c_void_p) 

659 return payload 

660 

661 

662def _get_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control, data): 

663 if data is None: 

664 if control.type == ControlType.INTEGER64: 

665 return raw_control.value64 

666 return raw_control.value 

667 else: 

668 if control.type == ControlType.STRING: 

669 return data.value.decode() 

670 return data 

671 

672 

673def get_controls_values(fd, controls: list[raw.v4l2_query_ext_ctrl], which=raw.ControlWhichValue.CUR_VAL, request_fd=0): 

674 n = len(controls) 

675 ctrls = raw.v4l2_ext_controls() 

676 ctrls.which = which 

677 ctrls.count = n 

678 ctrls.request_fd = request_fd 

679 ctrls.controls = (n * raw.v4l2_ext_control)() 

680 values = [_prepare_read_control_value(*args) for args in zip(controls, ctrls.controls, strict=False)] 

681 ioctl(fd, IOC.G_EXT_CTRLS, ctrls) 

682 return [_get_control_value(*args) for args in zip(controls, ctrls.controls, values, strict=False)] 

683 

684 

685def set_control(fd, id, value): 

686 control = raw.v4l2_control(id, value) 

687 ioctl(fd, IOC.S_CTRL, control) 

688 

689 

690def _prepare_write_controls_values(control: raw.v4l2_query_ext_ctrl, value: object, raw_control: raw.v4l2_ext_control): 

691 raw_control.id = control.id 

692 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags) 

693 if has_payload: 

694 if control.type == ControlType.STRING: 

695 raw_control.string = ctypes.create_string_buffer(value.encode()) 

696 raw_control.size = len(value) + 1 

697 else: 

698 array_type = CTRL_TYPE_CTYPE_ARRAY.get(control.type) 

699 raw_control.size = control.elem_size * control.elems 

700 # a struct: assume value is proper raw struct 

701 if array_type is None: 

702 value = ctypes.pointer(value) 

703 else: 

704 value = convert_to_ctypes_array(value, control.nr_of_dims, array_type) 

705 ptr = ctypes.cast(value, ctypes.c_void_p) 

706 raw_control.ptr = ptr 

707 else: 

708 if control.type == ControlType.INTEGER64: 

709 raw_control.value64 = value 

710 else: 

711 raw_control.value = value 

712 

713 

714def set_controls_values( 

715 fd, controls_values: list[tuple[raw.v4l2_query_ext_ctrl, object]], which=raw.ControlWhichValue.CUR_VAL, request_fd=0 

716): 

717 n = len(controls_values) 

718 ctrls = raw.v4l2_ext_controls() 

719 ctrls.which = which 

720 ctrls.count = n 

721 ctrls.request_fd = request_fd 

722 ctrls.controls = (n * raw.v4l2_ext_control)() 

723 for (control, value), raw_control in zip(controls_values, ctrls.controls, strict=False): 

724 _prepare_write_controls_values(control, value, raw_control) 

725 ioctl(fd, IOC.S_EXT_CTRLS, ctrls) 

726 

727 

728def get_priority(fd) -> Priority: 

729 priority = ctypes.c_uint() 

730 ioctl(fd, IOC.G_PRIORITY, priority) 

731 return Priority(priority.value) 

732 

733 

734def set_priority(fd, priority: Priority): 

735 priority = ctypes.c_uint(priority.value) 

736 ioctl(fd, IOC.S_PRIORITY, priority) 

737 

738 

739def subscribe_event( 

740 fd, 

741 event_type: EventType, 

742 id: int = 0, 

743 flags: EventSubscriptionFlag = 0, 

744): 

745 sub = raw.v4l2_event_subscription() 

746 sub.type = event_type 

747 sub.id = id 

748 sub.flags = flags 

749 ioctl(fd, IOC.SUBSCRIBE_EVENT, sub) 

750 

751 

752def unsubscribe_event(fd, event_type: EventType = EventType.ALL, id: int = 0): 

753 sub = raw.v4l2_event_subscription() 

754 sub.type = event_type 

755 sub.id = id 

756 ioctl(fd, IOC.UNSUBSCRIBE_EVENT, sub) 

757 

758 

759def deque_event(fd) -> raw.v4l2_event: 

760 event = raw.v4l2_event() 

761 return ioctl(fd, IOC.DQEVENT, event) 

762 

763 

764def set_edid(fd, edid): 

765 if len(edid) % 128: 

766 raise ValueError(f"EDID length {len(edid)} is not multiple of 128") 

767 edid_struct = raw.v4l2_edid() 

768 edid_struct.pad = 0 

769 edid_struct.start_block = 0 

770 edid_struct.blocks = len(edid) // 128 

771 edid_array = create_string_buffer(edid) 

772 edid_struct.edid = cast(edid_array, type(edid_struct.edid)) 

773 ioctl(fd, IOC.S_EDID, edid_struct) 

774 

775 

776def clear_edid(fd): 

777 set_edid(fd, b"") 

778 

779 

780def get_edid(fd): 

781 edid_struct = raw.v4l2_edid() 

782 ioctl(fd, IOC.G_EDID, edid_struct) 

783 if edid_struct.blocks == 0: 

784 return b"" 

785 edid_len = 128 * edid_struct.blocks 

786 edid_array = create_string_buffer(b"\0" * edid_len) 

787 edid_struct.edid = cast(edid_array, type(edid_struct.edid)) 

788 ioctl(fd, IOC.G_EDID, edid_struct) 

789 return string_at(edid_struct.edid, edid_len) 

790 

791 

792def get_input(fd): 

793 inp = ctypes.c_uint() 

794 ioctl(fd, IOC.G_INPUT, inp) 

795 return inp.value 

796 

797 

798def set_input(fd, index: int): 

799 index = ctypes.c_uint(index) 

800 ioctl(fd, IOC.S_INPUT, index) 

801 

802 

803def get_output(fd): 

804 out = ctypes.c_uint() 

805 ioctl(fd, IOC.G_OUTPUT, out) 

806 return out.value 

807 

808 

809def set_output(fd, index: int): 

810 index = ctypes.c_uint(index) 

811 ioctl(fd, IOC.S_OUTPUT, index) 

812 

813 

814def get_std(fd) -> StandardID: 

815 out = ctypes.c_uint64() 

816 ioctl(fd, IOC.G_STD, out) 

817 return StandardID(out.value) 

818 

819 

820def set_std(fd, std): 

821 ioctl(fd, IOC.S_STD, std) 

822 

823 

824def query_std(fd) -> StandardID: 

825 out = ctypes.c_uint64() 

826 ioctl(fd, IOC.QUERYSTD, out) 

827 return StandardID(out.value) 

828 

829 

830SubdevFormat = collections.namedtuple( 

831 "SubdevFormat", "pad which width height code field colorspace quantization xfer_func flags stream" 

832) 

833 

834 

835def _translate_subdev_format(fmt: raw.v4l2_subdev_format): 

836 return SubdevFormat( 

837 pad=fmt.pad, 

838 which=raw.SubdevFormatWhence(fmt.which), 

839 width=fmt.format.width, 

840 height=fmt.format.height, 

841 code=raw.MbusPixelcode(fmt.format.code), 

842 field=raw.Field(fmt.format.field), 

843 colorspace=raw.Colorspace(fmt.format.colorspace), 

844 quantization=raw.Quantization(fmt.format.quantization), 

845 xfer_func=raw.XferFunc(fmt.format.xfer_func), 

846 flags=raw.MbusFrameFormatFlag(fmt.format.flags), 

847 stream=fmt.stream, 

848 ) 

849 

850 

851def get_subdevice_format(fd, pad: int = 0) -> raw.v4l2_subdev_format: 

852 fmt = raw.v4l2_subdev_format(pad=pad, which=raw.SubdevFormatWhence.ACTIVE) 

853 return _translate_subdev_format(ioctl(fd, IOC.SUBDEV_G_FMT, fmt)) 

854 

855 

856# Helpers 

857 

858 

859def request_and_query_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: 

860 """request + query buffers""" 

861 buffers = request_and_query_buffers(fd, buffer_type, memory, 1) 

862 return buffers[0] 

863 

864 

865def request_and_query_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

866 """request + query buffers""" 

867 request_buffers(fd, buffer_type, memory, count) 

868 return [query_buffer(fd, buffer_type, memory, index) for index in range(count)] 

869 

870 

871def mmap_from_buffer(fd, buff: raw.v4l2_buffer) -> mmap.mmap: 

872 return mem_map(fd, buff.length, offset=buff.m.offset) 

873 

874 

875def create_mmap_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[mmap.mmap]: 

876 """create buffers + mmap_from_buffer""" 

877 return [mmap_from_buffer(fd, buff) for buff in request_and_query_buffers(fd, buffer_type, memory, count)] 

878 

879 

880def create_mmap_buffer(fd, buffer_type: BufferType, memory: Memory) -> mmap.mmap: 

881 return create_mmap_buffers(fd, buffer_type, memory, 1) 

882 

883 

884def enqueue_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

885 return [enqueue_buffer(fd, buffer_type, memory, 0, index) for index in range(count)] 

886 

887 

888class Device(BaseDevice): 

889 PREFIX = "/dev/video" 

890 

891 def __init__(self, name_or_file, read_write=True, io=IO, blocking=False): 

892 self.info = None 

893 self.controls = None 

894 super().__init__(name_or_file, read_write=read_write, io=io, blocking=blocking) 

895 

896 def __iter__(self): 

897 with VideoCapture(self) as stream: 

898 yield from stream 

899 

900 async def __aiter__(self): 

901 with VideoCapture(self) as stream: 

902 async for frame in stream: 

903 yield frame 

904 

905 def _on_open(self): 

906 self.info = InfoEx(self) 

907 self.controls = Controls.from_device(self) 

908 

909 def query_buffer(self, buffer_type, memory, index): 

910 return query_buffer(self, buffer_type, memory, index) 

911 

912 def enqueue_buffer(self, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: 

913 return enqueue_buffer(self, buffer_type, memory, size, index) 

914 

915 def dequeue_buffer(self, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: 

916 return dequeue_buffer(self, buffer_type, memory) 

917 

918 def request_buffers(self, buffer_type, memory, size): 

919 return request_buffers(self, buffer_type, memory, size) 

920 

921 def create_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

922 return request_and_query_buffers(self, buffer_type, memory, count) 

923 

924 def free_buffers(self, buffer_type, memory): 

925 return free_buffers(self, buffer_type, memory) 

926 

927 def enqueue_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

928 return enqueue_buffers(self, buffer_type, memory, count) 

929 

930 def set_format( 

931 self, 

932 buffer_type: BufferType, 

933 width: int, 

934 height: int, 

935 pixel_format: str = "MJPG", 

936 ): 

937 return set_format(self, buffer_type, width, height, pixel_format=pixel_format) 

938 

939 def get_format(self, buffer_type) -> Format: 

940 return get_format(self, buffer_type) 

941 

942 def set_fps(self, buffer_type, fps): 

943 return set_fps(self, buffer_type, fps) 

944 

945 def get_fps(self, buffer_type): 

946 return get_fps(self, buffer_type) 

947 

948 def set_selection(self, buffer_type, target, rectangle): 

949 return set_selection(self, buffer_type, target, rectangle) 

950 

951 def get_selection(self, buffer_type, target): 

952 return get_selection(self, buffer_type, target) 

953 

954 def get_priority(self) -> Priority: 

955 return get_priority(self) 

956 

957 def set_priority(self, priority: Priority): 

958 set_priority(self, priority) 

959 

960 def stream_on(self, buffer_type): 

961 stream_on(self, buffer_type) 

962 

963 def stream_off(self, buffer_type): 

964 stream_off(self, buffer_type) 

965 

966 def write(self, data: bytes) -> None: 

967 self._fobj.write(data) 

968 

969 def subscribe_event( 

970 self, 

971 event_type: EventType = EventType.ALL, 

972 id: int = 0, 

973 flags: EventSubscriptionFlag = 0, 

974 ): 

975 return subscribe_event(self, event_type, id, flags) 

976 

977 def unsubscribe_event(self, event_type: EventType = EventType.ALL, id: int = 0): 

978 return unsubscribe_event(self, event_type, id) 

979 

980 def deque_event(self): 

981 return deque_event(self) 

982 

983 def set_edid(self, edid): 

984 set_edid(self, edid) 

985 

986 def clear_edid(self): 

987 clear_edid(self) 

988 

989 def get_edid(self): 

990 return get_edid(self) 

991 

992 def get_input(self): 

993 return get_input(self) 

994 

995 def set_input(self, index: int): 

996 return set_input(self, index) 

997 

998 def get_output(self): 

999 return get_output(self) 

1000 

1001 def set_output(self, index: int): 

1002 return set_output(self, index) 

1003 

1004 def get_std(self) -> StandardID: 

1005 return get_std(self) 

1006 

1007 def set_std(self, std): 

1008 return set_std(self, std) 

1009 

1010 def query_std(self) -> StandardID: 

1011 return query_std(self) 

1012 

1013 def clone(self) -> Self: 

1014 fd = os.dup(self.fileno()) 

1015 fobj = self.io.os.fdopen(fd, "rb+", buffering=0) 

1016 fobj.name = self._fobj.name 

1017 return type(self)(fobj) 

1018 

1019 

1020class SubDevice(BaseDevice): 

1021 def get_format(self, pad: int = 0) -> SubdevFormat: 

1022 return get_subdevice_format(self, pad=pad) 

1023 

1024 

1025def create_artificial_control_class(class_id): 

1026 return raw.v4l2_query_ext_ctrl( 

1027 id=class_id | 1, 

1028 name=b"Generic Controls", 

1029 type=ControlType.CTRL_CLASS, 

1030 ) 

1031 

1032 

1033class Controls(dict): 

1034 def __init__(self, device: Device): 

1035 super().__init__() 

1036 self.__dict__["_device"] = device 

1037 self.__dict__["_initialized"] = False 

1038 

1039 def _init_if_needed(self): 

1040 if not self._initialized: 

1041 self.refresh() 

1042 self.__dict__["_initialized"] = True 

1043 

1044 def __repr__(self): 

1045 self._init_if_needed() 

1046 return super().__repr__() 

1047 

1048 def __getitem__(self, name): 

1049 self._init_if_needed() 

1050 return super().__getitem__(name) 

1051 

1052 def __len__(self): 

1053 self._init_if_needed() 

1054 return super().__len__() 

1055 

1056 def refresh(self): 

1057 ctrl_type_map = { 

1058 ControlType.BOOLEAN: BooleanControl, 

1059 ControlType.INTEGER: IntegerControl, 

1060 ControlType.INTEGER64: Integer64Control, 

1061 ControlType.MENU: MenuControl, 

1062 ControlType.INTEGER_MENU: MenuControl, 

1063 ControlType.U8: U8Control, 

1064 ControlType.U16: U16Control, 

1065 ControlType.U32: U32Control, 

1066 ControlType.BUTTON: ButtonControl, 

1067 } 

1068 classes = {} 

1069 for ctrl in self._device.info.controls: 

1070 ctrl_type = ControlType(ctrl.type) 

1071 ctrl_class_id = V4L2_CTRL_ID2CLASS(ctrl.id) 

1072 if ctrl_type == ControlType.CTRL_CLASS: 

1073 classes[ctrl_class_id] = ctrl 

1074 else: 

1075 klass = classes.get(ctrl_class_id) 

1076 if klass is None: 

1077 klass = create_artificial_control_class(ctrl_class_id) 

1078 classes[ctrl_class_id] = klass 

1079 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(ctrl.flags) 

1080 if has_payload: 

1081 ctrl_class = CompoundControl 

1082 else: 

1083 ctrl_class = ctrl_type_map.get(ctrl_type, GenericControl) 

1084 self[ctrl.id] = ctrl_class(self._device, ctrl, klass) 

1085 

1086 @classmethod 

1087 def from_device(cls, device): 

1088 """Deprecated: backward compatible. Please use Controls(device) constructor directly""" 

1089 return cls(device) 

1090 

1091 def __getattr__(self, key): 

1092 with contextlib.suppress(KeyError): 

1093 return self[key] 

1094 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'") 

1095 

1096 def __setattr__(self, key, value): 

1097 self._init_if_needed() 

1098 self[key] = value 

1099 

1100 def __missing__(self, key): 

1101 self._init_if_needed() 

1102 for v in self.values(): 

1103 if isinstance(v, BaseControl) and (v.config_name == key): 

1104 return v 

1105 raise KeyError(key) 

1106 

1107 def items(self): 

1108 self._init_if_needed() 

1109 return super().items() 

1110 

1111 def values(self): 

1112 self._init_if_needed() 

1113 return super().values() 

1114 

1115 def used_classes(self): 

1116 class_map = {v.control_class.id: v.control_class for v in self.values() if isinstance(v, BaseControl)} 

1117 return list(class_map.values()) 

1118 

1119 def with_class(self, control_class): 

1120 if isinstance(control_class, str): 

1121 control_class = ControlClass[control_class.upper()] 

1122 elif isinstance(control_class, int): 

1123 control_class = ControlClass(control_class) 

1124 elif not isinstance(control_class, ControlClass): 

1125 control_class = ControlClass(control_class.id - 1) 

1126 for v in self.values(): 

1127 if not isinstance(v, BaseControl): 

1128 continue 

1129 if v.control_class.id - 1 == control_class: 

1130 yield v 

1131 

1132 def set_to_default(self): 

1133 for v in self.values(): 

1134 if not isinstance(v, BaseControl): 

1135 continue 

1136 

1137 with contextlib.suppress(AttributeError): 

1138 v.set_to_default() 

1139 

1140 def set_clipping(self, clipping: bool) -> None: 

1141 for v in self.values(): 

1142 if isinstance(v, BaseNumericControl): 

1143 v.clipping = clipping 

1144 

1145 

1146class BaseControl: 

1147 def __init__(self, device, info, control_class, klass=None): 

1148 self.device = device 

1149 self._info = info 

1150 self.id = self._info.id 

1151 self.name = self._info.name.decode() 

1152 self._config_name = None 

1153 self.control_class = control_class 

1154 self.type = ControlType(self._info.type) 

1155 self.flags = ControlFlag(self._info.flags) 

1156 self.minimum = self._info.minimum 

1157 self.maximum = self._info.maximum 

1158 self.step = self._info.step 

1159 

1160 try: 

1161 self.standard = ControlID(self.id) 

1162 except ValueError: 

1163 self.standard = None 

1164 

1165 def __repr__(self): 

1166 repr = str(self.config_name) 

1167 

1168 addrepr = self._get_repr() 

1169 addrepr = addrepr.strip() 

1170 if addrepr: 

1171 repr += f" {addrepr}" 

1172 

1173 if self.flags: 

1174 flags = [flag.name.lower() for flag in ControlFlag if ((self._info.flags & flag) == flag)] 

1175 repr += " flags=" + ",".join(flags) 

1176 

1177 return f"<{type(self).__name__} {repr}>" 

1178 

1179 def _get_repr(self) -> str: 

1180 return "" 

1181 

1182 def _get_control(self): 

1183 # value = get_controls_values(self.device, [self._info])[0] 

1184 value = get_controls_values(self.device, (self._info,))[0] 

1185 return value 

1186 

1187 def _set_control(self, value): 

1188 if not self.is_writeable: 

1189 reasons = [] 

1190 if self.is_flagged_read_only: 

1191 reasons.append("read-only") 

1192 if self.is_flagged_inactive: 

1193 reasons.append("inactive") 

1194 if self.is_flagged_disabled: 

1195 reasons.append("disabled") 

1196 if self.is_flagged_grabbed: 

1197 reasons.append("grabbed") 

1198 raise AttributeError(f"{self.__class__.__name__} {self.config_name} is not writeable: {', '.join(reasons)}") 

1199 set_controls_values(self.device, ((self._info, value),)) 

1200 

1201 @property 

1202 def config_name(self) -> str: 

1203 if self._config_name is None: 

1204 res = self.name.lower() 

1205 for r in ("(", ")"): 

1206 res = res.replace(r, "") 

1207 for r in (", ", " "): 

1208 res = res.replace(r, "_") 

1209 self._config_name = res 

1210 return self._config_name 

1211 

1212 @property 

1213 def is_flagged_disabled(self) -> bool: 

1214 return ControlFlag.DISABLED in self.flags 

1215 

1216 @property 

1217 def is_flagged_grabbed(self) -> bool: 

1218 return ControlFlag.GRABBED in self.flags 

1219 

1220 @property 

1221 def is_flagged_read_only(self) -> bool: 

1222 return ControlFlag.READ_ONLY in self.flags 

1223 

1224 @property 

1225 def is_flagged_update(self) -> bool: 

1226 return ControlFlag.UPDATE in self.flags 

1227 

1228 @property 

1229 def is_flagged_inactive(self) -> bool: 

1230 return ControlFlag.INACTIVE in self.flags 

1231 

1232 @property 

1233 def is_flagged_slider(self) -> bool: 

1234 return ControlFlag.SLIDER in self.flags 

1235 

1236 @property 

1237 def is_flagged_write_only(self) -> bool: 

1238 return ControlFlag.WRITE_ONLY in self.flags 

1239 

1240 @property 

1241 def is_flagged_volatile(self) -> bool: 

1242 return ControlFlag.VOLATILE in self.flags 

1243 

1244 @property 

1245 def is_flagged_has_payload(self) -> bool: 

1246 return ControlFlag.HAS_PAYLOAD in self.flags 

1247 

1248 @property 

1249 def is_flagged_execute_on_write(self) -> bool: 

1250 return ControlFlag.EXECUTE_ON_WRITE in self.flags 

1251 

1252 @property 

1253 def is_flagged_modify_layout(self) -> bool: 

1254 return ControlFlag.MODIFY_LAYOUT in self.flags 

1255 

1256 @property 

1257 def is_flagged_dynamic_array(self) -> bool: 

1258 return ControlFlag.DYNAMIC_ARRAY in self.flags 

1259 

1260 @property 

1261 def is_writeable(self) -> bool: 

1262 return not (self.is_flagged_read_only or self.is_flagged_disabled or self.is_flagged_grabbed) 

1263 

1264 

1265class BaseMonoControl(BaseControl): 

1266 def _get_repr(self) -> str: 

1267 repr = f" default={self.default}" 

1268 if not self.is_flagged_write_only: 

1269 try: 

1270 repr += f" value={self.value}" 

1271 except Exception as error: 

1272 repr += f" value=<error: {error!r}>" 

1273 return repr 

1274 

1275 def _convert_read(self, value): 

1276 return value 

1277 

1278 @property 

1279 def default(self): 

1280 default = get_controls_values(self.device, (self._info,), raw.ControlWhichValue.DEF_VAL)[0] 

1281 return self._convert_read(default) 

1282 

1283 @property 

1284 def value(self): 

1285 if not self.is_flagged_write_only: 

1286 v = self._get_control() 

1287 return self._convert_read(v) 

1288 

1289 def _convert_write(self, value): 

1290 return value 

1291 

1292 def _mangle_write(self, value): 

1293 return value 

1294 

1295 @value.setter 

1296 def value(self, value): 

1297 v = self._convert_write(value) 

1298 v = self._mangle_write(v) 

1299 self._set_control(v) 

1300 

1301 def set_to_default(self): 

1302 self.value = self.default 

1303 

1304 

1305class GenericControl(BaseMonoControl): 

1306 pass 

1307 

1308 

1309class BaseNumericControl(BaseMonoControl): 

1310 lower_bound = -(2**31) 

1311 upper_bound = 2**31 - 1 

1312 

1313 def __init__(self, device, info, control_class, clipping=True): 

1314 super().__init__(device, info, control_class) 

1315 self.clipping = clipping 

1316 

1317 if self.minimum < self.lower_bound: 

1318 raise RuntimeWarning( 

1319 f"Control {self.config_name}'s claimed minimum value {self.minimum} exceeds lower bound of {self.__class__.__name__}" 

1320 ) 

1321 if self.maximum > self.upper_bound: 

1322 raise RuntimeWarning( 

1323 f"Control {self.config_name}'s claimed maximum value {self.maximum} exceeds upper bound of {self.__class__.__name__}" 

1324 ) 

1325 

1326 def _get_repr(self) -> str: 

1327 repr = f" min={self.minimum} max={self.maximum} step={self.step}" 

1328 repr += super()._get_repr() 

1329 return repr 

1330 

1331 def _convert_read(self, value): 

1332 return value 

1333 

1334 def _convert_write(self, value): 

1335 if isinstance(value, int): 

1336 return value 

1337 else: 

1338 try: 

1339 v = int(value) 

1340 except Exception: 

1341 pass 

1342 else: 

1343 return v 

1344 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to int") 

1345 

1346 def _mangle_write(self, value): 

1347 if self.clipping: 

1348 if value < self.minimum: 

1349 return self.minimum 

1350 elif value > self.maximum: 

1351 return self.maximum 

1352 else: 

1353 if value < self.minimum: 

1354 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed minimum {self.minimum}") 

1355 elif value > self.maximum: 

1356 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed maximum {self.maximum}") 

1357 return value 

1358 

1359 def increase(self, steps: int = 1): 

1360 self.value += steps * self.step 

1361 

1362 def decrease(self, steps: int = 1): 

1363 self.value -= steps * self.step 

1364 

1365 def set_to_minimum(self): 

1366 self.value = self.minimum 

1367 

1368 def set_to_maximum(self): 

1369 self.value = self.maximum 

1370 

1371 

1372class IntegerControl(BaseNumericControl): 

1373 lower_bound = -(2**31) 

1374 upper_bound = 2**31 - 1 

1375 

1376 

1377class Integer64Control(BaseNumericControl): 

1378 lower_bound = -(2**63) 

1379 upper_bound = 2**63 - 1 

1380 

1381 

1382class U8Control(BaseNumericControl): 

1383 lower_bound = 0 

1384 upper_bound = 2**8 

1385 

1386 

1387class U16Control(BaseNumericControl): 

1388 lower_bound = 0 

1389 upper_bound = 2**16 

1390 

1391 

1392class U32Control(BaseNumericControl): 

1393 lower_bound = 0 

1394 upper_bound = 2**32 

1395 

1396 

1397class BooleanControl(BaseMonoControl): 

1398 _true = ["true", "1", "yes", "on", "enable"] 

1399 _false = ["false", "0", "no", "off", "disable"] 

1400 

1401 def _convert_read(self, value): 

1402 return bool(value) 

1403 

1404 def _convert_write(self, value): 

1405 if isinstance(value, bool): 

1406 return value 

1407 elif isinstance(value, str): 

1408 if value in self._true: 

1409 return True 

1410 elif value in self._false: 

1411 return False 

1412 else: 

1413 try: 

1414 v = bool(value) 

1415 except Exception: 

1416 pass 

1417 else: 

1418 return v 

1419 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to bool") 

1420 

1421 

1422class MenuControl(BaseMonoControl, UserDict): 

1423 def __init__(self, device, info, control_class): 

1424 BaseControl.__init__(self, device, info, control_class) 

1425 UserDict.__init__(self) 

1426 

1427 if self.type == ControlType.MENU: 

1428 self.data = {item.index: item.name.decode() for item in iter_read_menu(self.device, self)} 

1429 elif self.type == ControlType.INTEGER_MENU: 

1430 self.data = {item.index: item.value for item in iter_read_menu(self.device, self)} 

1431 else: 

1432 raise TypeError(f"MenuControl only supports control types MENU or INTEGER_MENU, but not {self.type.name}") 

1433 

1434 def _convert_write(self, value): 

1435 return int(value) 

1436 

1437 

1438class ButtonControl(BaseControl): 

1439 def push(self): 

1440 self._set_control(1) 

1441 

1442 

1443class CompoundControl(BaseControl): 

1444 @property 

1445 def default(self): 

1446 return get_controls_values(self.device, [self._info], raw.ControlWhichValue.DEF_VAL)[0] 

1447 

1448 @property 

1449 def value(self): 

1450 if not self.is_flagged_write_only: 

1451 return get_controls_values(self.device, [self._info])[0] 

1452 

1453 @value.setter 

1454 def value(self, value): 

1455 set_controls_values(self.device, ((self._info, value),)) 

1456 

1457 def set_to_default(self): 

1458 self.value = self.default 

1459 

1460 

1461class DeviceHelper: 

1462 def __init__(self, device: Device): 

1463 super().__init__() 

1464 self.device = device 

1465 

1466 

1467class InfoEx(DeviceHelper): 

1468 INFO_REPR = """\ 

1469driver = {info.driver} 

1470card = {info.card} 

1471bus = {info.bus_info} 

1472version = {info.version} 

1473capabilities = {capabilities} 

1474device_capabilities = {device_capabilities} 

1475buffers = {buffers} 

1476""" 

1477 

1478 def __init__(self, device: "Device"): 

1479 self.device = device 

1480 self._raw_capabilities_cache = None 

1481 

1482 def __repr__(self): 

1483 dcaps = "|".join(cap.name for cap in flag_items(self.device_capabilities)) 

1484 caps = "|".join(cap.name for cap in flag_items(self.capabilities)) 

1485 buffers = "|".join(buff.name for buff in self.buffers) 

1486 return self.INFO_REPR.format(info=self, capabilities=caps, device_capabilities=dcaps, buffers=buffers) 

1487 

1488 @property 

1489 def raw_capabilities(self) -> raw.v4l2_capability: 

1490 if self._raw_capabilities_cache is None: 

1491 self._raw_capabilities_cache = read_capabilities(self.device) 

1492 return self._raw_capabilities_cache 

1493 

1494 @property 

1495 def driver(self) -> str: 

1496 return self.raw_capabilities.driver.decode() 

1497 

1498 @property 

1499 def card(self) -> str: 

1500 return self.raw_capabilities.card.decode() 

1501 

1502 @property 

1503 def bus_info(self) -> str: 

1504 return self.raw_capabilities.bus_info.decode() 

1505 

1506 @property 

1507 def version_tuple(self) -> tuple: 

1508 caps = self.raw_capabilities 

1509 return ( 

1510 (caps.version & 0xFF0000) >> 16, 

1511 (caps.version & 0x00FF00) >> 8, 

1512 (caps.version & 0x0000FF), 

1513 ) 

1514 

1515 @property 

1516 def version(self) -> str: 

1517 return ".".join(map(str, self.version_tuple)) 

1518 

1519 @property 

1520 def capabilities(self) -> Capability: 

1521 return Capability(self.raw_capabilities.capabilities) 

1522 

1523 @property 

1524 def device_capabilities(self) -> Capability: 

1525 return Capability(self.raw_capabilities.device_caps) 

1526 

1527 @property 

1528 def buffers(self): 

1529 dev_caps = self.device_capabilities 

1530 return [typ for typ in BufferType if Capability[typ.name] in dev_caps] 

1531 

1532 def get_crop_capabilities(self, buffer_type: BufferType) -> CropCapability: 

1533 return read_crop_capabilities(self.device, buffer_type) 

1534 

1535 @property 

1536 def crop_capabilities(self) -> dict[BufferType, CropCapability]: 

1537 buffer_types = CROP_BUFFER_TYPES & set(self.buffers) 

1538 result = {} 

1539 for buffer_type in buffer_types: 

1540 crop_cap = self.get_crop_capabilities(buffer_type) 

1541 if crop_cap is None: 

1542 continue 

1543 result[buffer_type] = crop_cap 

1544 return result 

1545 

1546 @property 

1547 def formats(self): 

1548 img_fmt_buffer_types = IMAGE_FORMAT_BUFFER_TYPES & set(self.buffers) 

1549 return [ 

1550 image_format 

1551 for buffer_type in img_fmt_buffer_types 

1552 for image_format in iter_read_formats(self.device, buffer_type) 

1553 ] 

1554 

1555 def buffer_formats(self, buffer_type) -> list[ImageFormat]: 

1556 return list(iter_read_formats(self.device, buffer_type)) 

1557 

1558 def format_frame_sizes(self, pixel_format) -> list[FrameSize]: 

1559 return list(iter_read_frame_sizes(self.device, pixel_format)) 

1560 

1561 def frame_sizes(self): 

1562 results = [] 

1563 for fmt in self.formats: 

1564 results.extend(self.format_frame_sizes(fmt.pixel_format)) 

1565 return results 

1566 

1567 def fps_intervals(self, pixel_format, width, height) -> list[FrameType]: 

1568 return list(iter_read_frame_intervals(self.device, pixel_format, width, height)) 

1569 

1570 @property 

1571 def frame_types(self): 

1572 pixel_formats = {fmt.pixel_format for fmt in self.formats} 

1573 return list(iter_read_pixel_formats_frame_intervals(self.device, pixel_formats)) 

1574 

1575 @property 

1576 def inputs(self) -> list[Input]: 

1577 return list(iter_read_inputs(self.device)) 

1578 

1579 @property 

1580 def outputs(self): 

1581 return list(iter_read_outputs(self.device)) 

1582 

1583 @property 

1584 def controls(self): 

1585 return list(iter_read_controls(self.device)) 

1586 

1587 @property 

1588 def video_standards(self) -> list[Standard]: 

1589 """List of video standards for the active input""" 

1590 return list(iter_read_video_standards(self.device)) 

1591 

1592 

1593class BufferManager(DeviceHelper): 

1594 def __init__(self, device: Device, buffer_type: BufferType, size: int = 2): 

1595 super().__init__(device) 

1596 self.type = buffer_type 

1597 self.size = size 

1598 self.buffers = None 

1599 self.name = type(self).__name__ 

1600 

1601 def formats(self) -> list: 

1602 formats = self.device.info.formats 

1603 return [fmt for fmt in formats if fmt.type == self.type] 

1604 

1605 def crop_capabilities(self): 

1606 crop_capabilities = self.device.info.crop_capabilities 

1607 return [crop for crop in crop_capabilities if crop.type == self.type] 

1608 

1609 def query_buffer(self, memory, index): 

1610 return self.device.query_buffer(self.type, memory, index) 

1611 

1612 def enqueue_buffer(self, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: 

1613 return self.device.enqueue_buffer(self.type, memory, size, index) 

1614 

1615 def dequeue_buffer(self, memory: Memory) -> raw.v4l2_buffer: 

1616 return self.device.dequeue_buffer(self.type, memory) 

1617 

1618 def enqueue_buffers(self, memory: Memory) -> list[raw.v4l2_buffer]: 

1619 return self.device.enqueue_buffers(self.type, memory, self.size) 

1620 

1621 def free_buffers(self, memory: Memory): 

1622 result = self.device.free_buffers(self.type, memory) 

1623 self.buffers = None 

1624 return result 

1625 

1626 def create_buffers(self, memory: Memory): 

1627 if self.buffers: 

1628 raise V4L2Error("buffers already requested. free first") 

1629 self.buffers = self.device.create_buffers(self.type, memory, self.size) 

1630 return self.buffers 

1631 

1632 def request_buffers(self, memory: Memory): 

1633 if self.buffers: 

1634 raise V4L2Error("buffers already requested. free first") 

1635 self.buffers = self.device.request_buffers(self.type, memory, self.size) 

1636 return self.buffers 

1637 

1638 def set_format(self, width, height, pixel_format="MJPG"): 

1639 return self.device.set_format(self.type, width, height, pixel_format) 

1640 

1641 def get_format(self) -> Format: 

1642 return self.device.get_format(self.type) 

1643 

1644 def set_fps(self, fps): 

1645 return self.device.set_fps(self.type, fps) 

1646 

1647 def get_fps(self): 

1648 return self.device.get_fps(self.type) 

1649 

1650 def set_selection(self, target, rectangle): 

1651 return self.device.set_selection(self.type, target, rectangle) 

1652 

1653 def get_selection(self, target): 

1654 return self.device.get_selection(self.type, target) 

1655 

1656 def stream_on(self): 

1657 self.device.stream_on(self.type) 

1658 

1659 def stream_off(self): 

1660 self.device.stream_off(self.type) 

1661 

1662 start = stream_on 

1663 stop = stream_off 

1664 

1665 

1666class Frame: 

1667 """The resulting object from an acquisition.""" 

1668 

1669 __slots__ = ["format", "buff", "data", "user_data"] 

1670 

1671 def __init__(self, data: bytes, buff: raw.v4l2_buffer, format: Format): 

1672 self.format = format 

1673 self.buff = buff 

1674 self.data = data 

1675 self.user_data = None 

1676 

1677 def __bytes__(self): 

1678 return self.data 

1679 

1680 def __len__(self): 

1681 return len(self.data) 

1682 

1683 def __getitem__(self, index): 

1684 return self.data[index] 

1685 

1686 def __repr__(self) -> str: 

1687 return ( 

1688 f"<{type(self).__name__} width={self.width}, height={self.height}, " 

1689 f"format={self.pixel_format.name}, frame_nb={self.frame_nb}, timestamp={self.timestamp}>" 

1690 ) 

1691 

1692 def __format__(self, spec): 

1693 if spec in {"", "s"}: 

1694 return str(self) 

1695 elif spec == "r": 

1696 return repr(self) 

1697 elif spec == "f": 

1698 return f"{self.width}x{self.height} {self.pixel_format.name}" 

1699 elif spec == "l": 

1700 return f"#{self.frame_nb} {self.timestamp} {self:f}" 

1701 return str(getattr(self, spec)) 

1702 

1703 @property 

1704 def width(self): 

1705 return self.format.width 

1706 

1707 @property 

1708 def height(self): 

1709 return self.format.height 

1710 

1711 @property 

1712 def nbytes(self): 

1713 return self.buff.bytesused 

1714 

1715 @property 

1716 def pixel_format(self): 

1717 return PixelFormat(self.format.pixel_format) 

1718 

1719 @property 

1720 def index(self): 

1721 return self.buff.index 

1722 

1723 @property 

1724 def type(self): 

1725 return BufferType(self.buff.type) 

1726 

1727 @property 

1728 def flags(self): 

1729 return BufferFlag(self.buff.flags) 

1730 

1731 @property 

1732 def timestamp(self): 

1733 return self.buff.timestamp.secs + self.buff.timestamp.usecs * 1e-6 

1734 

1735 @property 

1736 def frame_nb(self): 

1737 return self.buff.sequence 

1738 

1739 @property 

1740 def memory(self): 

1741 return Memory(self.buff.memory) 

1742 

1743 @property 

1744 def time_type(self): 

1745 if BufferFlag.TIMECODE in self.flags: 

1746 return TimeCodeType(self.buff.timecode.type) 

1747 

1748 @property 

1749 def time_flags(self): 

1750 if BufferFlag.TIMECODE in self.flags: 

1751 return TimeCodeFlag(self.buff.timecode.flags) 

1752 

1753 @property 

1754 def time_frame(self): 

1755 if BufferFlag.TIMECODE in self.flags: 

1756 return self.buff.timecode.frames 

1757 

1758 @property 

1759 def array(self): 

1760 import numpy 

1761 

1762 return numpy.frombuffer(self.data, dtype="u1") 

1763 

1764 

1765class VideoCapture(BufferManager): 

1766 def __init__(self, device: Device, size: int = 2, buffer_type=None): 

1767 super().__init__(device, BufferType.VIDEO_CAPTURE, size) 

1768 self.buffer = None 

1769 self._buffer_type = buffer_type 

1770 

1771 def __enter__(self): 

1772 self.open() 

1773 return self 

1774 

1775 def __exit__(self, *exc): 

1776 self.close() 

1777 

1778 def __iter__(self): 

1779 yield from self.buffer 

1780 

1781 async def __aiter__(self): 

1782 async for frame in self.buffer: 

1783 yield frame 

1784 

1785 @property 

1786 def buffer_type(self): 

1787 capabilities = self.device.info.device_capabilities 

1788 if Capability.VIDEO_CAPTURE not in capabilities: 

1789 raise V4L2Error("Device doesn't have VIDEO_CAPTURE capability") 

1790 if self._buffer_type is None: 

1791 if Capability.STREAMING in capabilities: 

1792 return MemoryMap 

1793 elif Capability.READWRITE in capabilities: 

1794 return ReadSource 

1795 return None 

1796 return self._buffer_type 

1797 

1798 def create_buffer(self): 

1799 buffer_type = self.buffer_type 

1800 if buffer_type is None: 

1801 raise OSError("Device needs to support STREAMING or READWRITE capability") 

1802 return buffer_type(self) 

1803 

1804 def arm(self): 

1805 self.device.log.info("Preparing for video capture...") 

1806 self.buffer = self.create_buffer() 

1807 self.buffer.open() 

1808 

1809 def disarm(self): 

1810 self.buffer.close() 

1811 self.buffer = None 

1812 

1813 def open(self): 

1814 if self.buffer is None: 

1815 self.arm() 

1816 self.stream_on() 

1817 self.device.log.info("Video capture started!") 

1818 

1819 def close(self): 

1820 if self.buffer: 

1821 self.device.log.info("Closing video capture...") 

1822 self.stream_off() 

1823 self.disarm() 

1824 self.device.log.info("Video capture closed") 

1825 

1826 

1827class ReadSource(ReentrantOpen): 

1828 def __init__(self, buffer_manager: BufferManager): 

1829 super().__init__() 

1830 self.buffer_manager = buffer_manager 

1831 self.frame_reader = FrameReader(self.device, self.raw_read) 

1832 self.format = None 

1833 

1834 def __iter__(self) -> Iterator[Frame]: 

1835 with self.frame_reader: 

1836 while True: 

1837 yield self.frame_reader.read() 

1838 

1839 async def __aiter__(self) -> AsyncIterator[Frame]: 

1840 async with self.frame_reader: 

1841 while True: 

1842 yield await self.frame_reader.aread() 

1843 

1844 def open(self) -> None: 

1845 self.format = self.buffer_manager.get_format() 

1846 

1847 def close(self) -> None: 

1848 self.format = None 

1849 

1850 @property 

1851 def device(self) -> Device: 

1852 return self.buffer_manager.device 

1853 

1854 def raw_grab(self) -> tuple[bytes, raw.v4l2_buffer]: 

1855 data = os.read(self.device, 2**31 - 1) 

1856 ns = time.time_ns() 

1857 buff = raw.v4l2_buffer() 

1858 buff.bytesused = len(data) 

1859 buff.timestamp.set_ns(ns) 

1860 return data, buff 

1861 

1862 def raw_read(self) -> Frame: 

1863 data, buff = self.raw_grab() 

1864 return Frame(data, buff, self.format) 

1865 

1866 def wait_read(self) -> Frame: 

1867 device = self.device 

1868 if device.io.select is not None: 

1869 device.io.select.select((device,), (), ()) 

1870 return self.raw_read() 

1871 

1872 def read(self) -> Frame: 

1873 # first time we check what mode device was opened (blocking vs non-blocking) 

1874 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

1875 # is available for read. So we need to do it here 

1876 if self.device.is_blocking: 

1877 self.read = self.raw_read 

1878 else: 

1879 self.read = self.wait_read 

1880 return self.read() 

1881 

1882 

1883class MemorySource(ReentrantOpen): 

1884 def __init__(self, buffer_manager: BufferManager, source: Memory): 

1885 super().__init__() 

1886 self.buffer_manager = buffer_manager 

1887 self.source = source 

1888 self.buffers = None 

1889 self.queue = BufferQueue(buffer_manager, source) 

1890 self.frame_reader = FrameReader(self.device, self.raw_read) 

1891 self.format = None 

1892 

1893 def __iter__(self) -> Iterator[Frame]: 

1894 with self.frame_reader: 

1895 yield from self.frame_reader 

1896 

1897 def __aiter__(self) -> AsyncIterator[Frame]: 

1898 return astream(self.device, self.raw_read) 

1899 

1900 @property 

1901 def device(self) -> Device: 

1902 return self.buffer_manager.device 

1903 

1904 def prepare_buffers(self): 

1905 raise NotImplementedError 

1906 

1907 def release_buffers(self): 

1908 self.device.log.info("Freeing buffers...") 

1909 for buf in self.buffers: 

1910 buf.close() 

1911 self.buffers = None 

1912 self.buffer_manager.free_buffers(self.source) 

1913 self.format = None 

1914 self.device.log.info("Buffers freed") 

1915 

1916 def open(self) -> None: 

1917 self.format = self.buffer_manager.get_format() 

1918 if self.buffers is None: 

1919 self.prepare_buffers() 

1920 

1921 def close(self) -> None: 

1922 if self.buffers: 

1923 self.release_buffers() 

1924 

1925 def grab_from_buffer(self, buff: raw.v4l2_buffer, into=None): 

1926 view = self.buffers[buff.index] 

1927 size = buff.bytesused 

1928 if into is None: 

1929 return view[:size], buff 

1930 into[:size] = memoryview(view)[:size] 

1931 return memoryview(into)[:size], buff 

1932 

1933 def raw_grab(self, into=None) -> tuple[Buffer, raw.v4l2_buffer]: 

1934 with self.queue as buff: 

1935 return self.grab_from_buffer(buff, into) 

1936 

1937 def raw_read(self, into=None) -> Frame: 

1938 data, buff = self.raw_grab(into) 

1939 return Frame(data, buff, self.format) 

1940 

1941 def wait_read(self) -> Frame: 

1942 device = self.device 

1943 if device.io.select is not None: 

1944 device.io.select((device,), (), ()) 

1945 return self.raw_read() 

1946 

1947 def read(self) -> Frame: 

1948 # first time we check what mode device was opened (blocking vs non-blocking) 

1949 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

1950 # is available for read. So we need to do it here 

1951 if self.device.is_blocking: 

1952 self.read = self.raw_read 

1953 else: 

1954 self.read = self.wait_read 

1955 return self.read() 

1956 

1957 def raw_write(self, data: Buffer) -> raw.v4l2_buffer: 

1958 with self.queue as buff: 

1959 size = getattr(data, "nbytes", len(data)) 

1960 memory = self.buffers[buff.index] 

1961 memory[:size] = data 

1962 buff.bytesused = size 

1963 return buff 

1964 

1965 def wait_write(self, data: Buffer) -> raw.v4l2_buffer: 

1966 device = self.device 

1967 if device.io.select is not None: 

1968 _, r, _ = device.io.select.select((), (device,), ()) 

1969 return self.raw_write(data) 

1970 

1971 def write(self, data: Buffer) -> raw.v4l2_buffer: 

1972 # first time we check what mode device was opened (blocking vs non-blocking) 

1973 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

1974 # is available for write. So we need to do it here 

1975 if self.device.is_blocking: 

1976 self.write = self.raw_write 

1977 else: 

1978 self.write = self.wait_write 

1979 return self.write(data) 

1980 

1981 

1982class UserPtr(MemorySource): 

1983 def __init__(self, buffer_manager: BufferManager): 

1984 super().__init__(buffer_manager, Memory.USERPTR) 

1985 self.log = self.device.log.getChild("MemoryMap") 

1986 

1987 def prepare_buffers(self): 

1988 self.log.info("Reserving buffers...") 

1989 self.buffer_manager.create_buffers(self.source) 

1990 size = self.format.size 

1991 self.buffers = [] 

1992 for index in range(self.buffer_manager.size): 

1993 data = ctypes.create_string_buffer(size) 

1994 self.buffers.append(data) 

1995 buff = raw.v4l2_buffer() 

1996 buff.index = index 

1997 buff.type = self.buffer_manager.type 

1998 buff.memory = self.source 

1999 buff.m.userptr = ctypes.addressof(data) 

2000 buff.length = size 

2001 self.queue.enqueue(buff) 

2002 self.log.info("Buffers reserved") 

2003 

2004 

2005class MemoryMap(MemorySource): 

2006 def __init__(self, buffer_manager: BufferManager): 

2007 super().__init__(buffer_manager, Memory.MMAP) 

2008 self.log = self.device.log.getChild("MemoryMap") 

2009 

2010 def prepare_buffers(self): 

2011 self.log.info("Reserving buffers...") 

2012 buffers = self.buffer_manager.create_buffers(self.source) 

2013 fd = self.device 

2014 self.buffers = [mmap_from_buffer(fd, buff) for buff in buffers] 

2015 self.buffer_manager.enqueue_buffers(Memory.MMAP) 

2016 self.format = self.buffer_manager.get_format() 

2017 self.log.info("Buffers reserved") 

2018 

2019 

2020class EventReader(ReentrantOpen): 

2021 def __init__(self, device: Device, max_queue_size=100): 

2022 super().__init__() 

2023 self.device = device 

2024 self._loop = None 

2025 self._selector = None 

2026 self._buffer = None 

2027 self._max_queue_size = max_queue_size 

2028 

2029 async def __aenter__(self): 

2030 if self.device.is_blocking: 

2031 raise V4L2Error("Cannot use async event reader on blocking device") 

2032 self._buffer = asyncio.Queue(maxsize=self._max_queue_size) 

2033 self._selector = select.epoll() 

2034 self._loop = asyncio.get_event_loop() 

2035 self._loop.add_reader(self._selector, self._on_event) 

2036 self._selector.register(self.device, select.EPOLLPRI) 

2037 return self 

2038 

2039 async def __aexit__(self, exc_type, exc_value, traceback): 

2040 self._selector.unregister(self.device) 

2041 self._loop.remove_reader(self._selector) 

2042 self._selector.close() 

2043 self._selector = None 

2044 self._loop = None 

2045 self._buffer = None 

2046 

2047 async def __aiter__(self): 

2048 while True: 

2049 yield await self.aread() 

2050 

2051 def __iter__(self): 

2052 device = self.device 

2053 with self: 

2054 if device.is_blocking: 

2055 while True: 

2056 yield device.deque_event() 

2057 else: 

2058 while True: 

2059 for _ in self._selector.poll(): 

2060 yield device.deque_event() 

2061 

2062 def _on_event(self): 

2063 task = self._loop.create_future() 

2064 try: 

2065 self._selector.poll(0) # avoid blocking 

2066 event = self.device.deque_event() 

2067 task.set_result(event) 

2068 except Exception as error: 

2069 task.set_exception(error) 

2070 

2071 buffer = self._buffer 

2072 if buffer.full(): 

2073 self.device.log.debug("missed event") 

2074 buffer.popleft() 

2075 buffer.put_nowait(task) 

2076 

2077 def open(self): 

2078 if not self.device.is_blocking: 

2079 self._selector = select.epoll() 

2080 self._selector.register(self.device, select.EPOLLPRI) 

2081 

2082 def close(self): 

2083 if self._selector: 

2084 self._selector.close() 

2085 self._selector = None 

2086 

2087 def fileno(self) -> int: 

2088 """Return the underlying file descriptor (an integer) of the stream if it exists""" 

2089 return self._selector.fileno() 

2090 

2091 def read(self, timeout=None): 

2092 device = self.device 

2093 if not device.is_blocking: 

2094 _, _, exc = device.io.select.select((), (), (self.device,), timeout) 

2095 if not exc: 

2096 return 

2097 return device.deque_event() 

2098 

2099 async def aread(self): 

2100 """Wait for next event or return last event in queue""" 

2101 task = await self._buffer.get() 

2102 return await task 

2103 

2104 

2105class FrameReader: 

2106 def __init__(self, device: Device, raw_read: Callable[[], Buffer], max_queue_size: int = 1): 

2107 self.device = device 

2108 self.raw_read = raw_read 

2109 self._loop = None 

2110 self._selector = None 

2111 self._buffer = None 

2112 self._max_queue_size = max_queue_size 

2113 self._device_fd = None 

2114 

2115 async def __aenter__(self) -> Self: 

2116 if self.device.is_blocking: 

2117 raise V4L2Error("Cannot use async frame reader on blocking device") 

2118 self._device_fd = self.device 

2119 self._buffer = asyncio.Queue(maxsize=self._max_queue_size) 

2120 self._selector = select.epoll() 

2121 self._loop = asyncio.get_event_loop() 

2122 self._loop.add_reader(self._selector, self._on_event) 

2123 self._selector.register(self._device_fd, select.POLLIN) 

2124 return self 

2125 

2126 async def __aexit__(self, exc_type, exc_value, traceback): 

2127 with contextlib.suppress(OSError): 

2128 # device may have been closed by now 

2129 self._selector.unregister(self._device_fd) 

2130 self._loop.remove_reader(self._selector) 

2131 self._selector.close() 

2132 self._selector = None 

2133 self._loop = None 

2134 self._buffer = None 

2135 

2136 def __enter__(self) -> Self: 

2137 return self 

2138 

2139 def __exit__(self, exc_type, exc_value, tb): 

2140 pass 

2141 

2142 def __iter__(self): 

2143 while True: 

2144 yield self.read() 

2145 

2146 def _on_event(self) -> None: 

2147 task = self._loop.create_future() 

2148 try: 

2149 self._selector.poll(0) # avoid blocking 

2150 data = self.raw_read() 

2151 task.set_result(data) 

2152 except Exception as error: 

2153 task.set_exception(error) 

2154 

2155 buffer = self._buffer 

2156 if buffer.full(): 

2157 self.device.log.warn("missed frame") 

2158 buffer.get_nowait() 

2159 buffer.put_nowait(task) 

2160 

2161 def read(self, timeout: Optional[float] = None) -> Frame: 

2162 if not self.device.is_blocking: 

2163 read, _, _ = self.device.io.select.select((self.device,), (), (), timeout) 

2164 if not read: 

2165 return 

2166 return self.raw_read() 

2167 

2168 async def aread(self) -> Frame: 

2169 """Wait for next frame or return last frame""" 

2170 task = await self._buffer.get() 

2171 return await task 

2172 

2173 

2174class BufferQueue: 

2175 def __init__(self, buffer_manager: BufferManager, memory: Memory): 

2176 self.buffer_manager = buffer_manager 

2177 self.memory = memory 

2178 self.raw_buffer = None 

2179 

2180 def enqueue(self, buff: raw.v4l2_buffer): 

2181 enqueue_buffer_raw(self.buffer_manager.device, buff) 

2182 

2183 def dequeue(self): 

2184 return self.buffer_manager.dequeue_buffer(self.memory) 

2185 

2186 def __enter__(self) -> raw.v4l2_buffer: 

2187 # get next buffer that has some data in it 

2188 self.raw_buffer = self.dequeue() 

2189 return self.raw_buffer 

2190 

2191 def __exit__(self, *exc): 

2192 # Make a copy of buffer. We need the original buffer that was sent to 

2193 # dequeue in to keep frame info like frame number, timestamp, etc 

2194 raw_buffer = raw.v4l2_buffer() 

2195 memcpy(raw_buffer, self.raw_buffer) 

2196 self.enqueue(raw_buffer) 

2197 

2198 

2199class Write(ReentrantOpen): 

2200 def __init__(self, buffer_manager: BufferManager): 

2201 super().__init__() 

2202 self.buffer_manager = buffer_manager 

2203 

2204 @property 

2205 def device(self) -> Device: 

2206 return self.buffer_manager.device 

2207 

2208 def raw_write(self, data: Buffer) -> None: 

2209 self.device.write(data) 

2210 

2211 def wait_write(self, data: Buffer) -> None: 

2212 device = self.device 

2213 if device.io.select is not None: 

2214 _, w, _ = device.io.select((), (device,), ()) 

2215 if not w: 

2216 raise OSError("Closed") 

2217 self.raw_write(data) 

2218 

2219 def write(self, data: Buffer) -> None: 

2220 # first time we check what mode device was opened (blocking vs non-blocking) 

2221 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

2222 # is available for write. So we need to do it here 

2223 if self.device.is_blocking: 

2224 self.write = self.raw_write 

2225 else: 

2226 self.write = self.wait_write 

2227 return self.write(data) 

2228 

2229 def open(self) -> None: 

2230 pass 

2231 

2232 def close(self) -> None: 

2233 pass 

2234 

2235 

2236class VideoOutput(BufferManager): 

2237 def __init__(self, device: Device, size: int = 2, sink: Capability = None): 

2238 super().__init__(device, BufferType.VIDEO_OUTPUT, size) 

2239 self.buffer = None 

2240 self.sink = sink 

2241 

2242 def __enter__(self) -> Self: 

2243 self.open() 

2244 return self 

2245 

2246 def __exit__(self, *exc) -> None: 

2247 self.close() 

2248 

2249 def open(self) -> None: 

2250 if self.buffer is not None: 

2251 return 

2252 self.device.log.info("Preparing for video output...") 

2253 capabilities = self.device.info.device_capabilities 

2254 # Don't check for output capability. Some drivers (ex: v4l2loopback) don't 

2255 # report being output capable so that apps like zoom recognize them 

2256 # if Capability.VIDEO_OUTPUT not in capabilities: 

2257 # raise V4L2Error("device lacks VIDEO_OUTPUT capability") 

2258 sink = capabilities if self.sink is None else self.sink 

2259 if Capability.STREAMING in sink: 

2260 self.device.log.info("Video output using memory map") 

2261 self.buffer = MemoryMap(self) 

2262 elif Capability.READWRITE in sink: 

2263 self.device.log.info("Video output using write") 

2264 self.buffer = Write(self) 

2265 else: 

2266 raise OSError("Device needs to support STREAMING or READWRITE capability") 

2267 self.buffer.open() 

2268 self.stream_on() 

2269 self.device.log.info("Video output started!") 

2270 

2271 def close(self) -> None: 

2272 if self.buffer: 

2273 self.device.log.info("Closing video output...") 

2274 try: 

2275 self.stream_off() 

2276 except Exception as error: 

2277 self.device.log.warning("Failed to close stream: %r", error) 

2278 try: 

2279 self.buffer.close() 

2280 except Exception as error: 

2281 self.device.log.warning("Failed to close buffer: %r", error) 

2282 self.buffer = None 

2283 self.device.log.info("Video output closed") 

2284 

2285 def write(self, data: Buffer) -> None: 

2286 self.buffer.write(data) 

2287 

2288 

2289def iter_video_files(path: PathLike = "/dev") -> Iterable[Path]: 

2290 """Returns an iterator over all video files""" 

2291 return iter_device_files(path=path, pattern="video*") 

2292 

2293 

2294def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

2295 """Returns an iterator over all video devices""" 

2296 return (Device(name, **kwargs) for name in iter_video_files(path=path)) 

2297 

2298 

2299def iter_video_capture_files(path: PathLike = "/dev") -> Iterable[Path]: 

2300 """Returns an iterator over all video files that have CAPTURE capability""" 

2301 

2302 def filt(filename): 

2303 with IO.open(filename) as fobj: 

2304 caps = read_capabilities(fobj) 

2305 return Capability.VIDEO_CAPTURE in Capability(caps.device_caps) 

2306 

2307 return filter(filt, iter_video_files(path)) 

2308 

2309 

2310def iter_video_capture_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

2311 """Returns an iterator over all video devices that have CAPTURE capability""" 

2312 return (Device(name, **kwargs) for name in iter_video_capture_files(path)) 

2313 

2314 

2315def iter_video_output_files(path: PathLike = "/dev") -> Iterable[Path]: 

2316 """ 

2317 Some drivers (ex: v4l2loopback) don't report being output capable so that 

2318 apps like zoom recognize them as valid capture devices so some results might 

2319 be missing 

2320 """ 

2321 

2322 def filt(filename): 

2323 with IO.open(filename) as fobj: 

2324 caps = read_capabilities(fobj) 

2325 return Capability.VIDEO_OUTPUT in Capability(caps.device_caps) 

2326 

2327 return filter(filt, iter_video_files(path)) 

2328 

2329 

2330def iter_video_output_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

2331 """Returns an iterator over all video devices that have VIDEO OUTPUT capability""" 

2332 return (Device(name, **kwargs) for name in iter_video_output_files(path)) 

2333 

2334 

2335find = make_find(iter_devices)