Coverage for linuxpy/video/device.py: 81%

1491 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-18 07:56 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7"""Human friendly interface to V4L2 (Video 4 Linux 2) subsystem.""" 

8 

9import asyncio 

10import collections 

11import contextlib 

12import copy 

13import ctypes 

14import enum 

15import errno 

16import fractions 

17import logging 

18import mmap 

19import os 

20import select 

21import time 

22from collections import UserDict 

23from pathlib import Path 

24 

25from linuxpy.ctypes import cast, cenum, create_string_buffer, memcpy, string_at 

26from linuxpy.device import ( 

27 BaseDevice, 

28 ReentrantOpen, 

29 iter_device_files, 

30) 

31from linuxpy.io import IO 

32from linuxpy.ioctl import ioctl 

33from linuxpy.types import AsyncIterator, Buffer, Callable, Iterable, Iterator, Optional, PathLike, Self 

34from linuxpy.util import astream, bit_indexes, make_find 

35 

36from . import raw 

37 

38log = logging.getLogger(__name__) 

39log_mmap = log.getChild("mmap") 

40 

41 

42class V4L2Error(Exception): 

43 """Video for linux 2 error""" 

44 

45 

46def _enum(name, prefix, klass=enum.IntEnum): 

47 return klass( 

48 name, 

49 ((name.replace(prefix, ""), getattr(raw, name)) for name in dir(raw) if name.startswith(prefix)), 

50 ) 

51 

52 

53FrameSizeType = raw.Frmsizetypes 

54FrameIntervalType = raw.Frmivaltypes 

55Field = raw.Field 

56ImageFormatFlag = raw.ImageFormatFlag 

57Capability = raw.Capability 

58ControlID = raw.ControlID 

59ControlFlag = raw.ControlFlag 

60ControlType = raw.CtrlType 

61ControlClass = raw.ControlClass 

62SelectionTarget = raw.SelectionTarget 

63EventType = raw.EventType 

64EventControlChange = raw.EventControlChange 

65IOC = raw.IOC 

66BufferType = raw.BufType 

67BufferFlag = raw.BufferFlag 

68InputType = raw.InputType 

69PixelFormat = raw.PixelFormat 

70MetaFormat = raw.MetaFormat 

71FrameSizeType = raw.Frmsizetypes 

72Memory = raw.Memory 

73InputStatus = raw.InputStatus 

74OutputType = raw.OutputType 

75InputCapabilities = raw.InputCapabilities 

76OutputCapabilities = raw.OutputCapabilities 

77Priority = raw.Priority 

78TimeCodeType = raw.TimeCodeType 

79TimeCodeFlag = raw.TimeCodeFlag 

80EventSubscriptionFlag = raw.EventSubscriptionFlag 

81StandardID = raw.StandardID 

82 

83 

84def V4L2_CTRL_ID2CLASS(id_): 

85 return id_ & 0x0FFF0000 # unsigned long 

86 

87 

88def human_pixel_format(ifmt): 

89 return "".join(map(chr, ((ifmt >> i) & 0xFF for i in range(0, 4 * 8, 8)))) 

90 

91 

92PixelFormat.human_str = lambda self: human_pixel_format(self.value) 92 ↛ exitline 92 didn't run the lambda on line 92

93MetaFormat.human_str = lambda self: human_pixel_format(self.value) 93 ↛ exitline 93 didn't run the lambda on line 93

94 

95 

96ImageFormat = collections.namedtuple("ImageFormat", "type description flags pixel_format") 

97 

98MetaFmt = collections.namedtuple("MetaFmt", "format max_buffer_size width height bytes_per_line") 

99 

100Format = collections.namedtuple("Format", "width height pixel_format size") 

101 

102CropCapability = collections.namedtuple("CropCapability", "type bounds defrect pixel_aspect") 

103 

104Rect = collections.namedtuple("Rect", "left top width height") 

105 

106Size = collections.namedtuple("Size", "width height") 

107 

108FrameType = collections.namedtuple("FrameType", "type pixel_format width height min_fps max_fps step_fps") 

109 

110Input = collections.namedtuple("InputType", "index name type audioset tuner std status capabilities") 

111 

112Output = collections.namedtuple("OutputType", "index name type audioset modulator std capabilities") 

113 

114Standard = collections.namedtuple("Standard", "index id name frameperiod framelines") 

115 

116 

117CROP_BUFFER_TYPES = { 

118 BufferType.VIDEO_CAPTURE, 

119 BufferType.VIDEO_CAPTURE_MPLANE, 

120 BufferType.VIDEO_OUTPUT, 

121 BufferType.VIDEO_OUTPUT_MPLANE, 

122 BufferType.VIDEO_OVERLAY, 

123} 

124 

125IMAGE_FORMAT_BUFFER_TYPES = { 

126 BufferType.VIDEO_CAPTURE, 

127 BufferType.VIDEO_CAPTURE_MPLANE, 

128 BufferType.VIDEO_OUTPUT, 

129 BufferType.VIDEO_OUTPUT_MPLANE, 

130 BufferType.VIDEO_OVERLAY, 

131 BufferType.META_CAPTURE, 

132 BufferType.META_OUTPUT, 

133} 

134 

135 

136def mem_map(fd, length, offset): 

137 log_mmap.debug("%s, length=%d, offset=%d", fd, length, offset) 

138 return mmap.mmap(fd, length, offset=offset) 

139 

140 

141def flag_items(flag): 

142 return [item for item in type(flag) if item in flag] 

143 

144 

145def raw_crop_caps_to_crop_caps(crop): 

146 return CropCapability( 

147 type=BufferType(crop.type), 

148 bounds=Rect( 

149 crop.bounds.left, 

150 crop.bounds.top, 

151 crop.bounds.width, 

152 crop.bounds.height, 

153 ), 

154 defrect=Rect( 

155 crop.defrect.left, 

156 crop.defrect.top, 

157 crop.defrect.width, 

158 crop.defrect.height, 

159 ), 

160 pixel_aspect=crop.pixelaspect.numerator / crop.pixelaspect.denominator, 

161 ) 

162 

163 

164CropCapability.from_raw = raw_crop_caps_to_crop_caps 

165 

166 

167def raw_read_crop_capabilities(fd, buffer_type: BufferType) -> raw.v4l2_cropcap: 

168 crop = raw.v4l2_cropcap() 

169 crop.type = buffer_type 

170 return ioctl(fd, IOC.CROPCAP, crop) 

171 

172 

173def read_crop_capabilities(fd, buffer_type: BufferType) -> CropCapability: 

174 try: 

175 crop = raw_read_crop_capabilities(fd, buffer_type) 

176 except OSError as error: 

177 if error.errno == errno.ENODATA: 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true

178 return None 

179 raise 

180 return raw_crop_caps_to_crop_caps(crop) 

181 

182 

183ITER_BREAK = (errno.ENOTTY, errno.ENODATA, errno.EPIPE) 

184 

185 

186def iter_read(fd, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval=False): 

187 for index in range(start, stop, step): 

188 indexed_struct.index = index 

189 try: 

190 ioctl(fd, ioc, indexed_struct) 

191 yield indexed_struct 

192 except OSError as error: 

193 if error.errno == errno.EINVAL: 

194 if ignore_einval: 

195 continue 

196 else: 

197 break 

198 elif error.errno in ITER_BREAK: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true

199 break 

200 else: 

201 raise 

202 

203 

204def iter_read_frame_intervals(fd, fmt, w, h): 

205 value = raw.v4l2_frmivalenum() 

206 value.pixel_format = fmt 

207 value.width = w 

208 value.height = h 

209 count = 0 

210 for val in iter_read(fd, IOC.ENUM_FRAMEINTERVALS, value): 

211 # values come in frame interval (fps = 1/interval) 

212 try: 

213 ftype = FrameIntervalType(val.type) 

214 except ValueError: 

215 break 

216 if ftype == FrameIntervalType.DISCRETE: 216 ↛ 219line 216 didn't jump to line 219 because the condition on line 216 was always true

217 min_fps = max_fps = step_fps = fractions.Fraction(val.discrete.denominator / val.discrete.numerator) 

218 else: 

219 if val.stepwise.min.numerator == 0: 

220 min_fps = 0 

221 else: 

222 min_fps = fractions.Fraction(val.stepwise.min.denominator, val.stepwise.min.numerator) 

223 if val.stepwise.max.numerator == 0: 

224 max_fps = 0 

225 else: 

226 max_fps = fractions.Fraction(val.stepwise.max.denominator, val.stepwise.max.numerator) 

227 if val.stepwise.step.numerator == 0: 

228 step_fps = 0 

229 else: 

230 step_fps = fractions.Fraction(val.stepwise.step.denominator, val.stepwise.step.numerator) 

231 yield FrameType( 

232 type=ftype, 

233 pixel_format=fmt, 

234 width=w, 

235 height=h, 

236 min_fps=min_fps, 

237 max_fps=max_fps, 

238 step_fps=step_fps, 

239 ) 

240 count += 1 

241 if not count: 241 ↛ 243line 241 didn't jump to line 243 because the condition on line 241 was never true

242 # If it wasn't possible to get frame interval, report discovered frame size anyway 

243 yield FrameType( 

244 type=FrameIntervalType.DISCRETE, 

245 pixel_format=fmt, 

246 width=w, 

247 height=h, 

248 min_fps=0, 

249 max_fps=0, 

250 step_fps=0, 

251 ) 

252 

253 

254def iter_read_discrete_frame_sizes(fd, pixel_format): 

255 size = raw.v4l2_frmsizeenum() 

256 size.index = 0 

257 size.pixel_format = pixel_format 

258 for val in iter_read(fd, IOC.ENUM_FRAMESIZES, size): 

259 if size.type != FrameSizeType.DISCRETE: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 break 

261 yield val 

262 

263 

264def iter_read_pixel_formats_frame_intervals(fd, pixel_formats): 

265 for pixel_format in pixel_formats: 

266 for size in iter_read_discrete_frame_sizes(fd, pixel_format): 

267 yield from iter_read_frame_intervals(fd, pixel_format, size.discrete.width, size.discrete.height) 

268 

269 

270def read_capabilities(fd): 

271 caps = raw.v4l2_capability() 

272 ioctl(fd, IOC.QUERYCAP, caps) 

273 return caps 

274 

275 

276def iter_read_formats(fd, type): 

277 format = raw.v4l2_fmtdesc() 

278 format.type = type 

279 pixel_formats = set(PixelFormat) 

280 meta_formats = set(MetaFormat) 

281 for fmt in iter_read(fd, IOC.ENUM_FMT, format): 

282 pixel_fmt = fmt.pixelformat 

283 if type in {BufferType.VIDEO_CAPTURE, BufferType.VIDEO_OUTPUT}: 

284 if pixel_fmt not in pixel_formats: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 log.warning( 

286 "ignored unknown pixel format %s (%d)", 

287 human_pixel_format(pixel_fmt), 

288 pixel_fmt, 

289 ) 

290 continue 

291 pixel_format = PixelFormat(pixel_fmt) 

292 elif type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}: 292 ↛ 301line 292 didn't jump to line 301 because the condition on line 292 was always true

293 if pixel_fmt not in meta_formats: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 log.warning( 

295 "ignored unknown meta format %s (%d)", 

296 human_pixel_format(pixel_fmt), 

297 pixel_fmt, 

298 ) 

299 continue 

300 pixel_format = MetaFormat(pixel_fmt) 

301 image_format = ImageFormat( 

302 type=type, 

303 flags=ImageFormatFlag(fmt.flags), 

304 description=fmt.description.decode(), 

305 pixel_format=pixel_format, 

306 ) 

307 yield image_format 

308 

309 

310def iter_read_inputs(fd): 

311 input = raw.v4l2_input() 

312 for inp in iter_read(fd, IOC.ENUMINPUT, input): 

313 input_type = Input( 

314 index=inp.index, 

315 name=inp.name.decode(), 

316 type=InputType(inp.type), 

317 audioset=bit_indexes(inp.audioset), 

318 tuner=inp.tuner, 

319 std=StandardID(inp.std), 

320 status=InputStatus(inp.status), 

321 capabilities=InputCapabilities(inp.capabilities), 

322 ) 

323 yield input_type 

324 

325 

326def iter_read_outputs(fd): 

327 output = raw.v4l2_output() 

328 for out in iter_read(fd, IOC.ENUMOUTPUT, output): 

329 output_type = Output( 

330 index=out.index, 

331 name=out.name.decode(), 

332 type=OutputType(out.type), 

333 audioset=bit_indexes(out.audioset), 

334 modulator=out.modulator, 

335 std=StandardID(out.std), 

336 capabilities=OutputCapabilities(out.capabilities), 

337 ) 

338 yield output_type 

339 

340 

341def iter_read_video_standards(fd): 

342 std = raw.v4l2_standard() 

343 for item in iter_read(fd, IOC.ENUMSTD, std): 

344 period = item.frameperiod 

345 yield Standard( 

346 index=item.index, 

347 id=StandardID(item.id), 

348 name=item.name.decode(), 

349 frameperiod=fractions.Fraction(period.denominator, period.numerator), 

350 framelines=item.framelines, 

351 ) 

352 

353 

354def iter_read_controls(fd): 

355 ctrl = raw.v4l2_query_ext_ctrl() 

356 nxt = ControlFlag.NEXT_CTRL | ControlFlag.NEXT_COMPOUND 

357 ctrl.id = nxt 

358 for ctrl_ext in iter_read(fd, IOC.QUERY_EXT_CTRL, ctrl): 

359 yield copy.deepcopy(ctrl_ext) 

360 ctrl_ext.id |= nxt 

361 

362 

363def iter_read_menu(fd, ctrl): 

364 qmenu = raw.v4l2_querymenu() 

365 qmenu.id = ctrl.id 

366 for menu in iter_read( 

367 fd, 

368 IOC.QUERYMENU, 

369 qmenu, 

370 start=ctrl._info.minimum, 

371 stop=ctrl._info.maximum + 1, 

372 step=ctrl._info.step, 

373 ignore_einval=True, 

374 ): 

375 yield copy.deepcopy(menu) 

376 

377 

378def query_buffer(fd, buffer_type: BufferType, memory: Memory, index: int) -> raw.v4l2_buffer: 

379 buff = raw.v4l2_buffer() 

380 buff.type = buffer_type 

381 buff.memory = memory 

382 buff.index = index 

383 buff.reserved = 0 

384 ioctl(fd, IOC.QUERYBUF, buff) 

385 return buff 

386 

387 

388def enqueue_buffer_raw(fd, buff: raw.v4l2_buffer) -> raw.v4l2_buffer: 

389 if not buff.timestamp.secs: 

390 buff.timestamp.set_ns() 

391 ioctl(fd, IOC.QBUF, buff) 

392 return buff 

393 

394 

395def enqueue_buffer(fd, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: 

396 buff = raw.v4l2_buffer() 

397 buff.type = buffer_type 

398 buff.memory = memory 

399 buff.bytesused = size 

400 buff.index = index 

401 buff.field = Field.NONE 

402 buff.reserved = 0 

403 return enqueue_buffer_raw(fd, buff) 

404 

405 

406def dequeue_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: 

407 buff = raw.v4l2_buffer() 

408 buff.type = buffer_type 

409 buff.memory = memory 

410 buff.index = 0 

411 buff.reserved = 0 

412 ioctl(fd, IOC.DQBUF, buff) 

413 return buff 

414 

415 

416def request_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> raw.v4l2_requestbuffers: 

417 req = raw.v4l2_requestbuffers() 

418 req.type = buffer_type 

419 req.memory = memory 

420 req.count = count 

421 ioctl(fd, IOC.REQBUFS, req) 

422 if not req.count: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 raise OSError("Not enough buffer memory") 

424 return req 

425 

426 

427def free_buffers(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_requestbuffers: 

428 req = raw.v4l2_requestbuffers() 

429 req.type = buffer_type 

430 req.memory = memory 

431 req.count = 0 

432 ioctl(fd, IOC.REQBUFS, req) 

433 return req 

434 

435 

436def export_buffer(fd, buffer_type: BufferType, index: int) -> int: 

437 req = raw.v4l2_exportbuffer(type=buffer_type, index=index) 

438 return ioctl(fd, IOC.EXPBUF, req).fd 

439 

440 

441def create_buffers(fd, format: raw.v4l2_format, memory: Memory, count: int) -> raw.v4l2_create_buffers: 

442 """Create buffers for Memory Mapped or User Pointer or DMA Buffer I/O""" 

443 req = raw.v4l2_create_buffers() 

444 req.format = format 

445 req.memory = memory 

446 req.count = count 

447 ioctl(fd, IOC.CREATE_BUFS, req) 

448 if not req.count: 

449 raise OSError("Not enough buffer memory") 

450 return req 

451 

452 

453def set_raw_format(fd, fmt: raw.v4l2_format): 

454 return ioctl(fd, IOC.S_FMT, fmt) 

455 

456 

457def set_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): 

458 fmt = raw.v4l2_format() 

459 if isinstance(pixel_format, str): 

460 pixel_format = raw.v4l2_fourcc(*pixel_format) 

461 fmt.type = buffer_type 

462 fmt.fmt.pix.pixelformat = pixel_format 

463 fmt.fmt.pix.field = Field.ANY 

464 fmt.fmt.pix.width = width 

465 fmt.fmt.pix.height = height 

466 fmt.fmt.pix.bytesperline = 0 

467 fmt.fmt.pix.sizeimage = 0 

468 return set_raw_format(fd, fmt) 

469 

470 

471def get_raw_format(fd, buffer_type) -> raw.v4l2_format: 

472 fmt = raw.v4l2_format() 

473 fmt.type = buffer_type 

474 ioctl(fd, IOC.G_FMT, fmt) 

475 return fmt 

476 

477 

478def get_format(fd, buffer_type) -> Format: 

479 f = get_raw_format(fd, buffer_type) 

480 if buffer_type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}: 

481 return MetaFmt( 

482 format=MetaFormat(f.fmt.meta.dataformat), 

483 max_buffer_size=f.fmt.meta.buffersize, 

484 width=f.fmt.meta.width, 

485 height=f.fmt.meta.height, 

486 bytes_per_line=f.fmt.meta.bytesperline, 

487 ) 

488 return Format( 

489 width=f.fmt.pix.width, 

490 height=f.fmt.pix.height, 

491 pixel_format=PixelFormat(f.fmt.pix.pixelformat), 

492 size=f.fmt.pix.sizeimage, 

493 ) 

494 

495 

496def try_raw_format(fd, fmt: raw.v4l2_format): 

497 ioctl(fd, IOC.TRY_FMT, fmt) 

498 

499 

500def try_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"): 

501 fmt = raw.v4l2_format() 

502 if isinstance(pixel_format, str): 

503 pixel_format = raw.v4l2_fourcc(*pixel_format) 

504 fmt.type = buffer_type 

505 fmt.fmt.pix.pixelformat = pixel_format 

506 fmt.fmt.pix.field = Field.ANY 

507 fmt.fmt.pix.width = width 

508 fmt.fmt.pix.height = height 

509 fmt.fmt.pix.bytesperline = 0 

510 fmt.fmt.pix.sizeimage = 0 

511 return try_raw_format(fd, fmt) 

512 

513 

514def get_parm(fd, buffer_type): 

515 p = raw.v4l2_streamparm() 

516 p.type = buffer_type 

517 ioctl(fd, IOC.G_PARM, p) 

518 return p 

519 

520 

521def set_fps(fd, buffer_type, fps): 

522 # v4l2 fraction is u32 

523 max_denominator = int(min(2**32, 2**32 / fps)) 

524 p = raw.v4l2_streamparm() 

525 p.type = buffer_type 

526 fps = fractions.Fraction(fps).limit_denominator(max_denominator) 

527 if buffer_type == BufferType.VIDEO_CAPTURE: 

528 p.parm.capture.timeperframe.numerator = fps.denominator 

529 p.parm.capture.timeperframe.denominator = fps.numerator 

530 elif buffer_type == BufferType.VIDEO_OUTPUT: 

531 p.parm.output.timeperframe.numerator = fps.denominator 

532 p.parm.output.timeperframe.denominator = fps.numerator 

533 else: 

534 raise ValueError(f"Unsupported buffer type {buffer_type!r}") 

535 return ioctl(fd, IOC.S_PARM, p) 

536 

537 

538def get_fps(fd, buffer_type): 

539 p = get_parm(fd, buffer_type) 

540 if buffer_type == BufferType.VIDEO_CAPTURE: 

541 parm = p.parm.capture 

542 elif buffer_type == BufferType.VIDEO_OUTPUT: 

543 parm = p.parm.output 

544 else: 

545 raise ValueError(f"Unsupported buffer type {buffer_type!r}") 

546 return fractions.Fraction(parm.timeperframe.denominator, parm.timeperframe.numerator) 

547 

548 

549def stream_on(fd, buffer_type): 

550 btype = cenum(buffer_type) 

551 return ioctl(fd, IOC.STREAMON, btype) 

552 

553 

554def stream_off(fd, buffer_type): 

555 btype = cenum(buffer_type) 

556 return ioctl(fd, IOC.STREAMOFF, btype) 

557 

558 

559def set_selection(fd, buffer_type, target, rectangle): 

560 sel = raw.v4l2_selection() 

561 sel.type = buffer_type 

562 sel.target = target 

563 sel.r.left = rectangle.left 

564 sel.r.top = rectangle.top 

565 sel.r.width = rectangle.width 

566 sel.r.height = rectangle.height 

567 ioctl(fd, IOC.S_SELECTION, sel) 

568 

569 

570def get_selection( 

571 fd, 

572 buffer_type: BufferType, 

573 target: SelectionTarget = SelectionTarget.CROP, 

574) -> Rect: 

575 sel = raw.v4l2_selection() 

576 sel.type = buffer_type 

577 sel.target = target 

578 ioctl(fd, IOC.G_SELECTION, sel) 

579 return Rect(left=sel.r.left, top=sel.r.top, width=sel.r.width, height=sel.r.height) 

580 

581 

582def get_control(fd, id): 

583 control = raw.v4l2_control(id) 

584 ioctl(fd, IOC.G_CTRL, control) 

585 return control.value 

586 

587 

588CTRL_TYPE_CTYPE_ARRAY = { 

589 ControlType.U8: ctypes.c_uint8, 

590 ControlType.U16: ctypes.c_uint16, 

591 ControlType.U32: ctypes.c_uint32, 

592 ControlType.INTEGER: ctypes.c_int, 

593 ControlType.INTEGER64: ctypes.c_int64, 

594} 

595 

596 

597CTRL_TYPE_CTYPE_STRUCT = { 

598 # ControlType.AREA: raw.v4l2_area, 

599} 

600 

601 

602def _struct_for_ctrl_type(ctrl_type): 

603 ctrl_type = ControlType(ctrl_type).name.lower() 

604 name = f"v4l2_ctrl_{ctrl_type}" 

605 try: 

606 return getattr(raw, name) 

607 except AttributeError: 

608 name = f"v4l2_{ctrl_type}" 

609 return getattr(raw, name) 

610 

611 

612def get_ctrl_type_struct(ctrl_type): 

613 struct = CTRL_TYPE_CTYPE_STRUCT.get(ctrl_type) 

614 if struct is None: 

615 struct = _struct_for_ctrl_type(ctrl_type) 

616 CTRL_TYPE_CTYPE_STRUCT[ctrl_type] = struct 

617 return struct 

618 

619 

620def convert_to_ctypes_array(lst, depth, ctype): 

621 """Convert a list (arbitrary depth) to a ctypes array.""" 

622 if depth == 1: 

623 return (ctype * len(lst))(*lst) 

624 

625 # Recursive case: we need to process the sub-lists first 

626 sub_arrays = [convert_to_ctypes_array(sub_lst, depth - 1, ctype) for sub_lst in lst] 

627 array_type = len(sub_arrays) * type(sub_arrays[0]) # Create the array type 

628 return array_type(*sub_arrays) 

629 

630 

631def _prepare_read_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control): 

632 raw_control.id = control.id 

633 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags) 

634 if has_payload: 

635 if control.type == ControlType.STRING: 

636 size = control.maximum + 1 

637 payload = ctypes.create_string_buffer(size) 

638 raw_control.string = payload 

639 raw_control.size = size 

640 else: 

641 ctype = CTRL_TYPE_CTYPE_ARRAY.get(control.type) 

642 raw_control.size = control.elem_size * control.elems 

643 if ctype is None: 

644 ctype = get_ctrl_type_struct(control.type) 

645 payload = ctype() 

646 raw_control.ptr = ctypes.cast(ctypes.pointer(payload), ctypes.c_void_p) 

647 else: 

648 for i in range(control.nr_of_dims): 

649 ctype *= control.dims[i] 

650 payload = ctype() 

651 raw_control.size = control.elem_size * control.elems 

652 raw_control.ptr = ctypes.cast(payload, ctypes.c_void_p) 

653 return payload 

654 

655 

656def _get_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control, data): 

657 if data is None: 

658 if control.type == ControlType.INTEGER64: 

659 return raw_control.value64 

660 return raw_control.value 

661 else: 

662 if control.type == ControlType.STRING: 

663 return data.value.decode() 

664 return data 

665 

666 

667def get_controls_values(fd, controls: list[raw.v4l2_query_ext_ctrl], which=raw.ControlWhichValue.CUR_VAL, request_fd=0): 

668 n = len(controls) 

669 ctrls = raw.v4l2_ext_controls() 

670 ctrls.which = which 

671 ctrls.count = n 

672 ctrls.request_fd = request_fd 

673 ctrls.controls = (n * raw.v4l2_ext_control)() 

674 values = [_prepare_read_control_value(*args) for args in zip(controls, ctrls.controls)] 

675 ioctl(fd, IOC.G_EXT_CTRLS, ctrls) 

676 return [_get_control_value(*args) for args in zip(controls, ctrls.controls, values)] 

677 

678 

679def set_control(fd, id, value): 

680 control = raw.v4l2_control(id, value) 

681 ioctl(fd, IOC.S_CTRL, control) 

682 

683 

684def _prepare_write_controls_values(control: raw.v4l2_query_ext_ctrl, value: object, raw_control: raw.v4l2_ext_control): 

685 raw_control.id = control.id 

686 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags) 

687 if has_payload: 

688 if control.type == ControlType.STRING: 

689 raw_control.string = ctypes.create_string_buffer(value.encode()) 

690 raw_control.size = len(value) + 1 

691 else: 

692 array_type = CTRL_TYPE_CTYPE_ARRAY.get(control.type) 

693 raw_control.size = control.elem_size * control.elems 

694 # a struct: assume value is proper raw struct 

695 if array_type is None: 

696 value = ctypes.pointer(value) 

697 else: 

698 value = convert_to_ctypes_array(value, control.nr_of_dims, array_type) 

699 ptr = ctypes.cast(value, ctypes.c_void_p) 

700 raw_control.ptr = ptr 

701 else: 

702 if control.type == ControlType.INTEGER64: 

703 raw_control.value64 = value 

704 else: 

705 raw_control.value = value 

706 

707 

708def set_controls_values( 

709 fd, controls_values: list[tuple[raw.v4l2_query_ext_ctrl, object]], which=raw.ControlWhichValue.CUR_VAL, request_fd=0 

710): 

711 n = len(controls_values) 

712 ctrls = raw.v4l2_ext_controls() 

713 ctrls.which = which 

714 ctrls.count = n 

715 ctrls.request_fd = request_fd 

716 ctrls.controls = (n * raw.v4l2_ext_control)() 

717 for (control, value), raw_control in zip(controls_values, ctrls.controls): 

718 _prepare_write_controls_values(control, value, raw_control) 

719 ioctl(fd, IOC.S_EXT_CTRLS, ctrls) 

720 

721 

722def get_priority(fd) -> Priority: 

723 priority = ctypes.c_uint() 

724 ioctl(fd, IOC.G_PRIORITY, priority) 

725 return Priority(priority.value) 

726 

727 

728def set_priority(fd, priority: Priority): 

729 priority = ctypes.c_uint(priority.value) 

730 ioctl(fd, IOC.S_PRIORITY, priority) 

731 

732 

733def subscribe_event( 

734 fd, 

735 event_type: EventType, 

736 id: int = 0, 

737 flags: EventSubscriptionFlag = 0, 

738): 

739 sub = raw.v4l2_event_subscription() 

740 sub.type = event_type 

741 sub.id = id 

742 sub.flags = flags 

743 ioctl(fd, IOC.SUBSCRIBE_EVENT, sub) 

744 

745 

746def unsubscribe_event(fd, event_type: EventType = EventType.ALL, id: int = 0): 

747 sub = raw.v4l2_event_subscription() 

748 sub.type = event_type 

749 sub.id = id 

750 ioctl(fd, IOC.UNSUBSCRIBE_EVENT, sub) 

751 

752 

753def deque_event(fd) -> raw.v4l2_event: 

754 event = raw.v4l2_event() 

755 return ioctl(fd, IOC.DQEVENT, event) 

756 

757 

758def set_edid(fd, edid): 

759 if len(edid) % 128: 

760 raise ValueError(f"EDID length {len(edid)} is not multiple of 128") 

761 edid_struct = raw.v4l2_edid() 

762 edid_struct.pad = 0 

763 edid_struct.start_block = 0 

764 edid_struct.blocks = len(edid) // 128 

765 edid_array = create_string_buffer(edid) 

766 edid_struct.edid = cast(edid_array, type(edid_struct.edid)) 

767 ioctl(fd, IOC.S_EDID, edid_struct) 

768 

769 

770def clear_edid(fd): 

771 set_edid(fd, b"") 

772 

773 

774def get_edid(fd): 

775 edid_struct = raw.v4l2_edid() 

776 ioctl(fd, IOC.G_EDID, edid_struct) 

777 if edid_struct.blocks == 0: 

778 return b"" 

779 edid_len = 128 * edid_struct.blocks 

780 edid_array = create_string_buffer(b"\0" * edid_len) 

781 edid_struct.edid = cast(edid_array, type(edid_struct.edid)) 

782 ioctl(fd, IOC.G_EDID, edid_struct) 

783 return string_at(edid_struct.edid, edid_len) 

784 

785 

786def get_input(fd): 

787 inp = ctypes.c_uint() 

788 ioctl(fd, IOC.G_INPUT, inp) 

789 return inp.value 

790 

791 

792def set_input(fd, index: int): 

793 index = ctypes.c_uint(index) 

794 ioctl(fd, IOC.S_INPUT, index) 

795 

796 

797def get_output(fd): 

798 out = ctypes.c_uint() 

799 ioctl(fd, IOC.G_OUTPUT, out) 

800 return out.value 

801 

802 

803def set_output(fd, index: int): 

804 index = ctypes.c_uint(index) 

805 ioctl(fd, IOC.S_OUTPUT, index) 

806 

807 

808def get_std(fd) -> StandardID: 

809 out = ctypes.c_uint64() 

810 ioctl(fd, IOC.G_STD, out) 

811 return StandardID(out.value) 

812 

813 

814def set_std(fd, std): 

815 ioctl(fd, IOC.S_STD, std) 

816 

817 

818def query_std(fd) -> StandardID: 

819 out = ctypes.c_uint64() 

820 ioctl(fd, IOC.QUERYSTD, out) 

821 return StandardID(out.value) 

822 

823 

824SubdevFormat = collections.namedtuple( 

825 "SubdevFormat", "pad which width height code field colorspace quantization xfer_func flags stream" 

826) 

827 

828 

829def _translate_subdev_format(fmt: raw.v4l2_subdev_format): 

830 return SubdevFormat( 

831 pad=fmt.pad, 

832 which=raw.SubdevFormatWhence(fmt.which), 

833 width=fmt.format.width, 

834 height=fmt.format.height, 

835 code=raw.MbusPixelcode(fmt.format.code), 

836 field=raw.Field(fmt.format.field), 

837 colorspace=raw.Colorspace(fmt.format.colorspace), 

838 quantization=raw.Quantization(fmt.format.quantization), 

839 xfer_func=raw.XferFunc(fmt.format.xfer_func), 

840 flags=raw.MbusFrameFormatFlag(fmt.format.flags), 

841 stream=fmt.stream, 

842 ) 

843 

844 

845def get_subdevice_format(fd, pad: int = 0) -> raw.v4l2_subdev_format: 

846 fmt = raw.v4l2_subdev_format(pad=pad, which=raw.SubdevFormatWhence.ACTIVE) 

847 return _translate_subdev_format(ioctl(fd, IOC.SUBDEV_G_FMT, fmt)) 

848 

849 

850# Helpers 

851 

852 

853def request_and_query_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: 

854 """request + query buffers""" 

855 buffers = request_and_query_buffers(fd, buffer_type, memory, 1) 

856 return buffers[0] 

857 

858 

859def request_and_query_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

860 """request + query buffers""" 

861 request_buffers(fd, buffer_type, memory, count) 

862 return [query_buffer(fd, buffer_type, memory, index) for index in range(count)] 

863 

864 

865def mmap_from_buffer(fd, buff: raw.v4l2_buffer) -> mmap.mmap: 

866 return mem_map(fd, buff.length, offset=buff.m.offset) 

867 

868 

869def create_mmap_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[mmap.mmap]: 

870 """create buffers + mmap_from_buffer""" 

871 return [mmap_from_buffer(fd, buff) for buff in request_and_query_buffers(fd, buffer_type, memory, count)] 

872 

873 

874def create_mmap_buffer(fd, buffer_type: BufferType, memory: Memory) -> mmap.mmap: 

875 return create_mmap_buffers(fd, buffer_type, memory, 1) 

876 

877 

878def enqueue_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

879 return [enqueue_buffer(fd, buffer_type, memory, 0, index) for index in range(count)] 

880 

881 

882class Device(BaseDevice): 

883 PREFIX = "/dev/video" 

884 

885 def __init__(self, name_or_file, read_write=True, io=IO): 

886 self.info = None 

887 self.controls = None 

888 super().__init__(name_or_file, read_write=read_write, io=io) 

889 

890 def __iter__(self): 

891 with VideoCapture(self) as stream: 

892 yield from stream 

893 

894 async def __aiter__(self): 

895 with VideoCapture(self) as stream: 

896 async for frame in stream: 

897 yield frame 

898 

899 def _on_open(self): 

900 self.info = InfoEx(self) 

901 self.controls = Controls.from_device(self) 

902 

903 def query_buffer(self, buffer_type, memory, index): 

904 return query_buffer(self.fileno(), buffer_type, memory, index) 

905 

906 def enqueue_buffer(self, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: 

907 return enqueue_buffer(self.fileno(), buffer_type, memory, size, index) 

908 

909 def dequeue_buffer(self, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer: 

910 return dequeue_buffer(self.fileno(), buffer_type, memory) 

911 

912 def request_buffers(self, buffer_type, memory, size): 

913 return request_buffers(self.fileno(), buffer_type, memory, size) 

914 

915 def create_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

916 return request_and_query_buffers(self.fileno(), buffer_type, memory, count) 

917 

918 def free_buffers(self, buffer_type, memory): 

919 return free_buffers(self.fileno(), buffer_type, memory) 

920 

921 def enqueue_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]: 

922 return enqueue_buffers(self.fileno(), buffer_type, memory, count) 

923 

924 def set_format( 

925 self, 

926 buffer_type: BufferType, 

927 width: int, 

928 height: int, 

929 pixel_format: str = "MJPG", 

930 ): 

931 return set_format(self.fileno(), buffer_type, width, height, pixel_format=pixel_format) 

932 

933 def get_format(self, buffer_type) -> Format: 

934 return get_format(self.fileno(), buffer_type) 

935 

936 def set_fps(self, buffer_type, fps): 

937 return set_fps(self.fileno(), buffer_type, fps) 

938 

939 def get_fps(self, buffer_type): 

940 return get_fps(self.fileno(), buffer_type) 

941 

942 def set_selection(self, buffer_type, target, rectangle): 

943 return set_selection(self.fileno(), buffer_type, target, rectangle) 

944 

945 def get_selection(self, buffer_type, target): 

946 return get_selection(self.fileno(), buffer_type, target) 

947 

948 def get_priority(self) -> Priority: 

949 return get_priority(self.fileno()) 

950 

951 def set_priority(self, priority: Priority): 

952 set_priority(self.fileno(), priority) 

953 

954 def stream_on(self, buffer_type): 

955 self.log.info("Starting %r stream...", buffer_type.name) 

956 stream_on(self.fileno(), buffer_type) 

957 self.log.info("%r stream ON", buffer_type.name) 

958 

959 def stream_off(self, buffer_type): 

960 self.log.info("Stoping %r stream...", buffer_type.name) 

961 stream_off(self.fileno(), buffer_type) 

962 self.log.info("%r stream OFF", buffer_type.name) 

963 

964 def write(self, data: bytes) -> None: 

965 self._fobj.write(data) 

966 

967 def subscribe_event( 

968 self, 

969 event_type: EventType = EventType.ALL, 

970 id: int = 0, 

971 flags: EventSubscriptionFlag = 0, 

972 ): 

973 return subscribe_event(self.fileno(), event_type, id, flags) 

974 

975 def unsubscribe_event(self, event_type: EventType = EventType.ALL, id: int = 0): 

976 return unsubscribe_event(self.fileno(), event_type, id) 

977 

978 def deque_event(self): 

979 return deque_event(self.fileno()) 

980 

981 def set_edid(self, edid): 

982 set_edid(self.fileno(), edid) 

983 

984 def clear_edid(self): 

985 clear_edid(self.fileno()) 

986 

987 def get_edid(self): 

988 return get_edid(self.fileno()) 

989 

990 def get_input(self): 

991 return get_input(self.fileno()) 

992 

993 def set_input(self, index: int): 

994 return set_input(self.fileno(), index) 

995 

996 def get_output(self): 

997 return get_output(self.fileno()) 

998 

999 def set_output(self, index: int): 

1000 return set_output(self.fileno(), index) 

1001 

1002 def get_std(self) -> StandardID: 

1003 return get_std(self.fileno()) 

1004 

1005 def set_std(self, std): 

1006 return set_std(self.fileno(), std) 

1007 

1008 def query_std(self) -> StandardID: 

1009 return query_std(self.fileno()) 

1010 

1011 

1012class SubDevice(BaseDevice): 

1013 def get_format(self, pad: int = 0) -> SubdevFormat: 

1014 return get_subdevice_format(self, pad=pad) 

1015 

1016 

1017def create_artificial_control_class(class_id): 

1018 return raw.v4l2_query_ext_ctrl( 

1019 id=class_id | 1, 

1020 name=b"Generic Controls", 

1021 type=ControlType.CTRL_CLASS, 

1022 ) 

1023 

1024 

1025class Controls(dict): 

1026 def __init__(self, device: Device): 

1027 super().__init__() 

1028 self.__dict__["_device"] = device 

1029 self.__dict__["_initialized"] = False 

1030 

1031 def _init_if_needed(self): 

1032 if not self._initialized: 

1033 self._load() 

1034 self.__dict__["_initialized"] = True 

1035 

1036 def __getitem__(self, name): 

1037 self._init_if_needed() 

1038 return super().__getitem__(name) 

1039 

1040 def __len__(self): 

1041 self._init_if_needed() 

1042 return super().__len__() 

1043 

1044 def _load(self): 

1045 ctrl_type_map = { 

1046 ControlType.BOOLEAN: BooleanControl, 

1047 ControlType.INTEGER: IntegerControl, 

1048 ControlType.INTEGER64: Integer64Control, 

1049 ControlType.MENU: MenuControl, 

1050 ControlType.INTEGER_MENU: MenuControl, 

1051 ControlType.U8: U8Control, 

1052 ControlType.U16: U16Control, 

1053 ControlType.U32: U32Control, 

1054 ControlType.BUTTON: ButtonControl, 

1055 } 

1056 classes = {} 

1057 for ctrl in self._device.info.controls: 

1058 ctrl_type = ControlType(ctrl.type) 

1059 ctrl_class_id = V4L2_CTRL_ID2CLASS(ctrl.id) 

1060 if ctrl_type == ControlType.CTRL_CLASS: 

1061 classes[ctrl_class_id] = ctrl 

1062 else: 

1063 klass = classes.get(ctrl_class_id) 

1064 if klass is None: 

1065 klass = create_artificial_control_class(ctrl_class_id) 

1066 classes[ctrl_class_id] = klass 

1067 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(ctrl.flags) 

1068 if has_payload: 

1069 ctrl_class = CompoundControl 

1070 else: 

1071 ctrl_class = ctrl_type_map.get(ctrl_type, GenericControl) 

1072 self[ctrl.id] = ctrl_class(self._device, ctrl, klass) 

1073 

1074 @classmethod 

1075 def from_device(cls, device): 

1076 """Deprecated: backward compatible. Please use Controls(device) constructor directly""" 

1077 return cls(device) 

1078 

1079 def __getattr__(self, key): 

1080 with contextlib.suppress(KeyError): 

1081 return self[key] 

1082 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'") 

1083 

1084 def __setattr__(self, key, value): 

1085 self._init_if_needed() 

1086 self[key] = value 

1087 

1088 def __missing__(self, key): 

1089 self._init_if_needed() 

1090 for v in self.values(): 

1091 if isinstance(v, BaseControl) and (v.config_name == key): 

1092 return v 

1093 raise KeyError(key) 

1094 

1095 def values(self): 

1096 self._init_if_needed() 

1097 return super().values() 

1098 

1099 def used_classes(self): 

1100 class_map = {v.control_class.id: v.control_class for v in self.values() if isinstance(v, BaseControl)} 

1101 return list(class_map.values()) 

1102 

1103 def with_class(self, control_class): 

1104 if isinstance(control_class, str): 

1105 control_class = ControlClass[control_class.upper()] 

1106 elif isinstance(control_class, int): 1106 ↛ 1108line 1106 didn't jump to line 1108 because the condition on line 1106 was always true

1107 control_class = ControlClass(control_class) 

1108 elif not isinstance(control_class, ControlClass): 

1109 control_class = ControlClass(control_class.id - 1) 

1110 for v in self.values(): 1110 ↛ exitline 1110 didn't return from function 'with_class' because the loop on line 1110 didn't complete

1111 if not isinstance(v, BaseControl): 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true

1112 continue 

1113 if v.control_class.id - 1 == control_class: 1113 ↛ 1110line 1113 didn't jump to line 1110 because the condition on line 1113 was always true

1114 yield v 

1115 

1116 def set_to_default(self): 

1117 for v in self.values(): 

1118 if not isinstance(v, BaseControl): 

1119 continue 

1120 

1121 with contextlib.suppress(AttributeError): 

1122 v.set_to_default() 

1123 

1124 def set_clipping(self, clipping: bool) -> None: 

1125 for v in self.values(): 

1126 if isinstance(v, BaseNumericControl): 

1127 v.clipping = clipping 

1128 

1129 

1130class BaseControl: 

1131 def __init__(self, device, info, control_class): 

1132 self.device = device 

1133 self._info = info 

1134 self.id = self._info.id 

1135 self.name = self._info.name.decode() 

1136 self._config_name = None 

1137 self.control_class = control_class 

1138 self.type = ControlType(self._info.type) 

1139 self.flags = ControlFlag(self._info.flags) 

1140 

1141 try: 

1142 self.standard = ControlID(self.id) 

1143 except ValueError: 

1144 self.standard = None 

1145 

1146 def __repr__(self): 

1147 repr = str(self.config_name) 

1148 

1149 addrepr = self._get_repr() 

1150 addrepr = addrepr.strip() 

1151 if addrepr: 

1152 repr += f" {addrepr}" 

1153 

1154 if self.flags: 

1155 flags = [flag.name.lower() for flag in ControlFlag if ((self._info.flags & flag) == flag)] 

1156 repr += " flags=" + ",".join(flags) 

1157 

1158 return f"<{type(self).__name__} {repr}>" 

1159 

1160 def _get_repr(self) -> str: 

1161 return "" 

1162 

1163 def _get_control(self): 

1164 # value = get_controls_values(self.device, [self._info])[0] 

1165 value = get_controls_values(self.device, (self._info,))[0] 

1166 return value 

1167 

1168 def _set_control(self, value): 

1169 if not self.is_writeable: 

1170 reasons = [] 

1171 if self.is_flagged_read_only: 

1172 reasons.append("read-only") 

1173 if self.is_flagged_inactive: 1173 ↛ 1174line 1173 didn't jump to line 1174 because the condition on line 1173 was never true

1174 reasons.append("inactive") 

1175 if self.is_flagged_disabled: 

1176 reasons.append("disabled") 

1177 if self.is_flagged_grabbed: 1177 ↛ 1178line 1177 didn't jump to line 1178 because the condition on line 1177 was never true

1178 reasons.append("grabbed") 

1179 raise AttributeError(f"{self.__class__.__name__} {self.config_name} is not writeable: {', '.join(reasons)}") 

1180 set_controls_values(self.device, ((self._info, value),)) 

1181 

1182 @property 

1183 def config_name(self) -> str: 

1184 if self._config_name is None: 

1185 res = self.name.lower() 

1186 for r in ("(", ")"): 

1187 res = res.replace(r, "") 

1188 for r in (", ", " "): 

1189 res = res.replace(r, "_") 

1190 self._config_name = res 

1191 return self._config_name 

1192 

1193 @property 

1194 def is_flagged_disabled(self) -> bool: 

1195 return ControlFlag.DISABLED in self.flags 

1196 

1197 @property 

1198 def is_flagged_grabbed(self) -> bool: 

1199 return ControlFlag.GRABBED in self.flags 

1200 

1201 @property 

1202 def is_flagged_read_only(self) -> bool: 

1203 return ControlFlag.READ_ONLY in self.flags 

1204 

1205 @property 

1206 def is_flagged_update(self) -> bool: 

1207 return ControlFlag.UPDATE in self.flags 

1208 

1209 @property 

1210 def is_flagged_inactive(self) -> bool: 

1211 return ControlFlag.INACTIVE in self.flags 

1212 

1213 @property 

1214 def is_flagged_slider(self) -> bool: 

1215 return ControlFlag.SLIDER in self.flags 

1216 

1217 @property 

1218 def is_flagged_write_only(self) -> bool: 

1219 return ControlFlag.WRITE_ONLY in self.flags 

1220 

1221 @property 

1222 def is_flagged_volatile(self) -> bool: 

1223 return ControlFlag.VOLATILE in self.flags 

1224 

1225 @property 

1226 def is_flagged_has_payload(self) -> bool: 

1227 return ControlFlag.HAS_PAYLOAD in self.flags 

1228 

1229 @property 

1230 def is_flagged_execute_on_write(self) -> bool: 

1231 return ControlFlag.EXECUTE_ON_WRITE in self.flags 

1232 

1233 @property 

1234 def is_flagged_modify_layout(self) -> bool: 

1235 return ControlFlag.MODIFY_LAYOUT in self.flags 

1236 

1237 @property 

1238 def is_flagged_dynamic_array(self) -> bool: 

1239 return ControlFlag.DYNAMIC_ARRAY in self.flags 

1240 

1241 @property 

1242 def is_writeable(self) -> bool: 

1243 return not (self.is_flagged_read_only or self.is_flagged_disabled or self.is_flagged_grabbed) 

1244 

1245 

1246class BaseMonoControl(BaseControl): 

1247 def _get_repr(self) -> str: 

1248 repr = f" default={self.default}" 

1249 if not self.is_flagged_write_only: 1249 ↛ 1254line 1249 didn't jump to line 1254 because the condition on line 1249 was always true

1250 try: 

1251 repr += f" value={self.value}" 

1252 except Exception as error: 

1253 repr += f" value=<error: {error!r}>" 

1254 return repr 

1255 

1256 def _convert_read(self, value): 

1257 return value 

1258 

1259 @property 

1260 def default(self): 

1261 default = get_controls_values(self.device, (self._info,), raw.ControlWhichValue.DEF_VAL)[0] 

1262 return self._convert_read(default) 

1263 

1264 @property 

1265 def value(self): 

1266 if not self.is_flagged_write_only: 1266 ↛ exitline 1266 didn't return from function 'value' because the condition on line 1266 was always true

1267 v = self._get_control() 

1268 return self._convert_read(v) 

1269 

1270 def _convert_write(self, value): 

1271 return value 

1272 

1273 def _mangle_write(self, value): 

1274 return value 

1275 

1276 @value.setter 

1277 def value(self, value): 

1278 v = self._convert_write(value) 

1279 v = self._mangle_write(v) 

1280 self._set_control(v) 

1281 

1282 def set_to_default(self): 

1283 self.value = self.default 

1284 

1285 

1286class GenericControl(BaseMonoControl): 

1287 pass 

1288 

1289 

1290class BaseNumericControl(BaseMonoControl): 

1291 lower_bound = -(2**31) 

1292 upper_bound = 2**31 - 1 

1293 

1294 def __init__(self, device, info, control_class, clipping=True): 

1295 super().__init__(device, info, control_class) 

1296 self.minimum = self._info.minimum 

1297 self.maximum = self._info.maximum 

1298 self.step = self._info.step 

1299 self.clipping = clipping 

1300 

1301 if self.minimum < self.lower_bound: 1301 ↛ 1302line 1301 didn't jump to line 1302 because the condition on line 1301 was never true

1302 raise RuntimeWarning( 

1303 f"Control {self.config_name}'s claimed minimum value {self.minimum} exceeds lower bound of {self.__class__.__name__}" 

1304 ) 

1305 if self.maximum > self.upper_bound: 1305 ↛ 1306line 1305 didn't jump to line 1306 because the condition on line 1305 was never true

1306 raise RuntimeWarning( 

1307 f"Control {self.config_name}'s claimed maximum value {self.maximum} exceeds upper bound of {self.__class__.__name__}" 

1308 ) 

1309 

1310 def _get_repr(self) -> str: 

1311 repr = f" min={self.minimum} max={self.maximum} step={self.step}" 

1312 repr += super()._get_repr() 

1313 return repr 

1314 

1315 def _convert_read(self, value): 

1316 return value 

1317 

1318 def _convert_write(self, value): 

1319 if isinstance(value, int): 

1320 return value 

1321 else: 

1322 try: 

1323 v = int(value) 

1324 except Exception: 

1325 pass 

1326 else: 

1327 return v 

1328 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to int") 

1329 

1330 def _mangle_write(self, value): 

1331 if self.clipping: 

1332 if value < self.minimum: 

1333 return self.minimum 

1334 elif value > self.maximum: 

1335 return self.maximum 

1336 else: 

1337 if value < self.minimum: 

1338 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed minimum {self.minimum}") 

1339 elif value > self.maximum: 1339 ↛ 1341line 1339 didn't jump to line 1341 because the condition on line 1339 was always true

1340 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed maximum {self.maximum}") 

1341 return value 

1342 

1343 def increase(self, steps: int = 1): 

1344 self.value += steps * self.step 

1345 

1346 def decrease(self, steps: int = 1): 

1347 self.value -= steps * self.step 

1348 

1349 def set_to_minimum(self): 

1350 self.value = self.minimum 

1351 

1352 def set_to_maximum(self): 

1353 self.value = self.maximum 

1354 

1355 

1356class IntegerControl(BaseNumericControl): 

1357 lower_bound = -(2**31) 

1358 upper_bound = 2**31 - 1 

1359 

1360 

1361class Integer64Control(BaseNumericControl): 

1362 lower_bound = -(2**63) 

1363 upper_bound = 2**63 - 1 

1364 

1365 

1366class U8Control(BaseNumericControl): 

1367 lower_bound = 0 

1368 upper_bound = 2**8 

1369 

1370 

1371class U16Control(BaseNumericControl): 

1372 lower_bound = 0 

1373 upper_bound = 2**16 

1374 

1375 

1376class U32Control(BaseNumericControl): 

1377 lower_bound = 0 

1378 upper_bound = 2**32 

1379 

1380 

1381class BooleanControl(BaseMonoControl): 

1382 _true = ["true", "1", "yes", "on", "enable"] 

1383 _false = ["false", "0", "no", "off", "disable"] 

1384 

1385 def _convert_read(self, value): 

1386 return bool(value) 

1387 

1388 def _convert_write(self, value): 

1389 if isinstance(value, bool): 

1390 return value 

1391 elif isinstance(value, str): 

1392 if value in self._true: 

1393 return True 

1394 elif value in self._false: 1394 ↛ 1403line 1394 didn't jump to line 1403 because the condition on line 1394 was always true

1395 return False 

1396 else: 

1397 try: 

1398 v = bool(value) 

1399 except Exception: 

1400 pass 

1401 else: 

1402 return v 

1403 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to bool") 

1404 

1405 

1406class MenuControl(BaseMonoControl, UserDict): 

1407 def __init__(self, device, info, control_class): 

1408 BaseControl.__init__(self, device, info, control_class) 

1409 UserDict.__init__(self) 

1410 

1411 if self.type == ControlType.MENU: 

1412 self.data = {item.index: item.name.decode() for item in iter_read_menu(self.device, self)} 

1413 elif self.type == ControlType.INTEGER_MENU: 1413 ↛ 1416line 1413 didn't jump to line 1416 because the condition on line 1413 was always true

1414 self.data = {item.index: item.value for item in iter_read_menu(self.device, self)} 

1415 else: 

1416 raise TypeError(f"MenuControl only supports control types MENU or INTEGER_MENU, but not {self.type.name}") 

1417 

1418 def _convert_write(self, value): 

1419 return int(value) 

1420 

1421 

1422class ButtonControl(BaseControl): 

1423 def push(self): 

1424 self._set_control(1) 

1425 

1426 

1427class CompoundControl(BaseControl): 

1428 @property 

1429 def default(self): 

1430 return get_controls_values(self.device, [self._info], raw.ControlWhichValue.DEF_VAL)[0] 

1431 

1432 @property 

1433 def value(self): 

1434 if not self.is_flagged_write_only: 1434 ↛ exitline 1434 didn't return from function 'value' because the condition on line 1434 was always true

1435 return get_controls_values(self.device, [self._info])[0] 

1436 

1437 @value.setter 

1438 def value(self, value): 

1439 set_controls_values(self.device, ((self._info, value),)) 

1440 

1441 

1442class DeviceHelper: 

1443 def __init__(self, device: Device): 

1444 super().__init__() 

1445 self.device = device 

1446 

1447 

1448class InfoEx(DeviceHelper): 

1449 INFO_REPR = """\ 

1450driver = {info.driver} 

1451card = {info.card} 

1452bus = {info.bus_info} 

1453version = {info.version} 

1454capabilities = {capabilities} 

1455device_capabilities = {device_capabilities} 

1456buffers = {buffers} 

1457""" 

1458 

1459 def __init__(self, device: "Device"): 

1460 self.device = device 

1461 self._raw_capabilities_cache = None 

1462 

1463 def __repr__(self): 

1464 dcaps = "|".join(cap.name for cap in flag_items(self.device_capabilities)) 

1465 caps = "|".join(cap.name for cap in flag_items(self.capabilities)) 

1466 buffers = "|".join(buff.name for buff in self.buffers) 

1467 return self.INFO_REPR.format(info=self, capabilities=caps, device_capabilities=dcaps, buffers=buffers) 

1468 

1469 @property 

1470 def raw_capabilities(self) -> raw.v4l2_capability: 

1471 if self._raw_capabilities_cache is None: 

1472 self._raw_capabilities_cache = read_capabilities(self.device) 

1473 return self._raw_capabilities_cache 

1474 

1475 @property 

1476 def driver(self) -> str: 

1477 return self.raw_capabilities.driver.decode() 

1478 

1479 @property 

1480 def card(self) -> str: 

1481 return self.raw_capabilities.card.decode() 

1482 

1483 @property 

1484 def bus_info(self) -> str: 

1485 return self.raw_capabilities.bus_info.decode() 

1486 

1487 @property 

1488 def version_tuple(self) -> tuple: 

1489 caps = self.raw_capabilities 

1490 return ( 

1491 (caps.version & 0xFF0000) >> 16, 

1492 (caps.version & 0x00FF00) >> 8, 

1493 (caps.version & 0x0000FF), 

1494 ) 

1495 

1496 @property 

1497 def version(self) -> str: 

1498 return ".".join(map(str, self.version_tuple)) 

1499 

1500 @property 

1501 def capabilities(self) -> Capability: 

1502 return Capability(self.raw_capabilities.capabilities) 

1503 

1504 @property 

1505 def device_capabilities(self) -> Capability: 

1506 return Capability(self.raw_capabilities.device_caps) 

1507 

1508 @property 

1509 def buffers(self): 

1510 dev_caps = self.device_capabilities 

1511 return [typ for typ in BufferType if Capability[typ.name] in dev_caps] 

1512 

1513 def get_crop_capabilities(self, buffer_type: BufferType) -> CropCapability: 

1514 return read_crop_capabilities(self.device, buffer_type) 

1515 

1516 @property 

1517 def crop_capabilities(self) -> dict[BufferType, CropCapability]: 

1518 buffer_types = CROP_BUFFER_TYPES & set(self.buffers) 

1519 result = {} 

1520 for buffer_type in buffer_types: 

1521 crop_cap = self.get_crop_capabilities(buffer_type) 

1522 if crop_cap is None: 

1523 continue 

1524 result[buffer_type] = crop_cap 

1525 return result 

1526 

1527 @property 

1528 def formats(self): 

1529 img_fmt_buffer_types = IMAGE_FORMAT_BUFFER_TYPES & set(self.buffers) 

1530 return [ 

1531 image_format 

1532 for buffer_type in img_fmt_buffer_types 

1533 for image_format in iter_read_formats(self.device, buffer_type) 

1534 ] 

1535 

1536 @property 

1537 def frame_sizes(self): 

1538 pixel_formats = {fmt.pixel_format for fmt in self.formats} 

1539 return list(iter_read_pixel_formats_frame_intervals(self.device, pixel_formats)) 

1540 

1541 @property 

1542 def inputs(self) -> list[Input]: 

1543 return list(iter_read_inputs(self.device)) 

1544 

1545 @property 

1546 def outputs(self): 

1547 return list(iter_read_outputs(self.device)) 

1548 

1549 @property 

1550 def controls(self): 

1551 return list(iter_read_controls(self.device)) 

1552 

1553 @property 

1554 def video_standards(self) -> list[Standard]: 

1555 """List of video standards for the active input""" 

1556 return list(iter_read_video_standards(self.device)) 

1557 

1558 

1559class BufferManager(DeviceHelper): 

1560 def __init__(self, device: Device, buffer_type: BufferType, size: int = 2): 

1561 super().__init__(device) 

1562 self.type = buffer_type 

1563 self.size = size 

1564 self.buffers = None 

1565 self.name = type(self).__name__ 

1566 

1567 def formats(self) -> list: 

1568 formats = self.device.info.formats 

1569 return [fmt for fmt in formats if fmt.type == self.type] 

1570 

1571 def crop_capabilities(self): 

1572 crop_capabilities = self.device.info.crop_capabilities 

1573 return [crop for crop in crop_capabilities if crop.type == self.type] 

1574 

1575 def query_buffer(self, memory, index): 

1576 return self.device.query_buffer(self.type, memory, index) 

1577 

1578 def enqueue_buffer(self, memory: Memory, size: int, index: int) -> raw.v4l2_buffer: 

1579 return self.device.enqueue_buffer(self.type, memory, size, index) 

1580 

1581 def dequeue_buffer(self, memory: Memory) -> raw.v4l2_buffer: 

1582 return self.device.dequeue_buffer(self.type, memory) 

1583 

1584 def enqueue_buffers(self, memory: Memory) -> list[raw.v4l2_buffer]: 

1585 return self.device.enqueue_buffers(self.type, memory, self.size) 

1586 

1587 def free_buffers(self, memory: Memory): 

1588 result = self.device.free_buffers(self.type, memory) 

1589 self.buffers = None 

1590 return result 

1591 

1592 def create_buffers(self, memory: Memory): 

1593 if self.buffers: 1593 ↛ 1594line 1593 didn't jump to line 1594 because the condition on line 1593 was never true

1594 raise V4L2Error("buffers already requested. free first") 

1595 self.buffers = self.device.create_buffers(self.type, memory, self.size) 

1596 return self.buffers 

1597 

1598 def request_buffers(self, memory: Memory): 

1599 if self.buffers: 

1600 raise V4L2Error("buffers already requested. free first") 

1601 self.buffers = self.device.request_buffers(self.type, memory, self.size) 

1602 return self.buffers 

1603 

1604 def set_format(self, width, height, pixel_format="MJPG"): 

1605 return self.device.set_format(self.type, width, height, pixel_format) 

1606 

1607 def get_format(self) -> Format: 

1608 return self.device.get_format(self.type) 

1609 

1610 def set_fps(self, fps): 

1611 return self.device.set_fps(self.type, fps) 

1612 

1613 def get_fps(self): 

1614 return self.device.get_fps(self.type) 

1615 

1616 def set_selection(self, target, rectangle): 

1617 return self.device.set_selection(self.type, target, rectangle) 

1618 

1619 def get_selection(self, target): 

1620 return self.device.get_selection(self.type, target) 

1621 

1622 def stream_on(self): 

1623 self.device.stream_on(self.type) 

1624 

1625 def stream_off(self): 

1626 self.device.stream_off(self.type) 

1627 

1628 start = stream_on 

1629 stop = stream_off 

1630 

1631 

1632class Frame: 

1633 """The resulting object from an acquisition.""" 

1634 

1635 __slots__ = ["format", "buff", "data"] 

1636 

1637 def __init__(self, data: bytes, buff: raw.v4l2_buffer, format: Format): 

1638 self.format = format 

1639 self.buff = buff 

1640 self.data = data 

1641 

1642 def __bytes__(self): 

1643 return self.data 

1644 

1645 def __len__(self): 

1646 return len(self.data) 

1647 

1648 def __getitem__(self, index): 

1649 return self.data[index] 

1650 

1651 def __repr__(self) -> str: 

1652 return ( 

1653 f"<{type(self).__name__} width={self.width}, height={self.height}, " 

1654 f"format={self.pixel_format.name}, frame_nb={self.frame_nb}, timestamp={self.timestamp}>" 

1655 ) 

1656 

1657 @property 

1658 def width(self): 

1659 return self.format.width 

1660 

1661 @property 

1662 def height(self): 

1663 return self.format.height 

1664 

1665 @property 

1666 def nbytes(self): 

1667 return self.buff.bytesused 

1668 

1669 @property 

1670 def pixel_format(self): 

1671 return PixelFormat(self.format.pixel_format) 

1672 

1673 @property 

1674 def index(self): 

1675 return self.buff.index 

1676 

1677 @property 

1678 def type(self): 

1679 return BufferType(self.buff.type) 

1680 

1681 @property 

1682 def flags(self): 

1683 return BufferFlag(self.buff.flags) 

1684 

1685 @property 

1686 def timestamp(self): 

1687 return self.buff.timestamp.secs + self.buff.timestamp.usecs * 1e-6 

1688 

1689 @property 

1690 def frame_nb(self): 

1691 return self.buff.sequence 

1692 

1693 @property 

1694 def memory(self): 

1695 return Memory(self.buff.memory) 

1696 

1697 @property 

1698 def time_type(self): 

1699 if BufferFlag.TIMECODE in self.flags: 

1700 return TimeCodeType(self.buff.timecode.type) 

1701 

1702 @property 

1703 def time_flags(self): 

1704 if BufferFlag.TIMECODE in self.flags: 

1705 return TimeCodeFlag(self.buff.timecode.flags) 

1706 

1707 @property 

1708 def time_frame(self): 

1709 if BufferFlag.TIMECODE in self.flags: 

1710 return self.buff.timecode.frames 

1711 

1712 @property 

1713 def array(self): 

1714 import numpy 

1715 

1716 return numpy.frombuffer(self.data, dtype="u1") 

1717 

1718 

1719class VideoCapture(BufferManager): 

1720 def __init__(self, device: Device, size: int = 2, source: Capability = None): 

1721 super().__init__(device, BufferType.VIDEO_CAPTURE, size) 

1722 self.buffer = None 

1723 self.source = source 

1724 

1725 def __enter__(self): 

1726 self.open() 

1727 return self 

1728 

1729 def __exit__(self, *exc): 

1730 self.close() 

1731 

1732 def __iter__(self): 

1733 yield from self.buffer 

1734 

1735 async def __aiter__(self): 

1736 async for frame in self.buffer: 1736 ↛ exitline 1736 didn't return from function '__aiter__' because the loop on line 1736 didn't complete

1737 yield frame 

1738 

1739 def open(self): 

1740 if self.buffer is None: 1740 ↛ exitline 1740 didn't return from function 'open' because the condition on line 1740 was always true

1741 self.device.log.info("Preparing for video capture...") 

1742 capabilities = self.device.info.device_capabilities 

1743 if Capability.VIDEO_CAPTURE not in capabilities: 

1744 raise V4L2Error("device lacks VIDEO_CAPTURE capability") 

1745 source = capabilities if self.source is None else self.source 

1746 if Capability.STREAMING in source: 1746 ↛ 1750line 1746 didn't jump to line 1750 because the condition on line 1746 was always true

1747 self.device.log.info("Video capture using memory map") 

1748 self.buffer = MemoryMap(self) 

1749 # self.buffer = UserPtr(self) 

1750 elif Capability.READWRITE in source: 

1751 self.device.log.info("Video capture using read") 

1752 self.buffer = ReadSource(self) 

1753 else: 

1754 raise OSError("Device needs to support STREAMING or READWRITE capability") 

1755 self.buffer.open() 

1756 self.stream_on() 

1757 self.device.log.info("Video capture started!") 

1758 

1759 def close(self): 

1760 if self.buffer: 1760 ↛ exitline 1760 didn't return from function 'close' because the condition on line 1760 was always true

1761 self.device.log.info("Closing video capture...") 

1762 self.stream_off() 

1763 self.buffer.close() 

1764 self.buffer = None 

1765 self.device.log.info("Video capture closed") 

1766 

1767 

1768class ReadSource(ReentrantOpen): 

1769 def __init__(self, buffer_manager: BufferManager): 

1770 super().__init__() 

1771 self.buffer_manager = buffer_manager 

1772 self.frame_reader = FrameReader(self.device, self.raw_read) 

1773 self.format = None 

1774 

1775 def __iter__(self) -> Iterator[Frame]: 

1776 with self.frame_reader: 

1777 while True: 

1778 yield self.frame_reader.read() 

1779 

1780 async def __aiter__(self) -> AsyncIterator[Frame]: 

1781 async with self.frame_reader: 

1782 while True: 

1783 yield await self.frame_reader.aread() 

1784 

1785 def open(self) -> None: 

1786 self.format = self.buffer_manager.get_format() 

1787 

1788 def close(self) -> None: 

1789 self.format = None 

1790 

1791 @property 

1792 def device(self) -> Device: 

1793 return self.buffer_manager.device 

1794 

1795 def raw_grab(self) -> tuple[bytes, raw.v4l2_buffer]: 

1796 data = os.read(self.device.fileno(), 2**31 - 1) 

1797 ns = time.time_ns() 

1798 buff = raw.v4l2_buffer() 

1799 buff.bytesused = len(data) 

1800 buff.timestamp.set_ns(ns) 

1801 return data, buff 

1802 

1803 def raw_read(self) -> Frame: 

1804 data, buff = self.raw_grab() 

1805 return Frame(data, buff, self.format) 

1806 

1807 def wait_read(self) -> Frame: 

1808 device = self.device 

1809 if device.io.select is not None: 

1810 device.io.select((device,), (), ()) 

1811 return self.raw_read() 

1812 

1813 def read(self) -> Frame: 

1814 # first time we check what mode device was opened (blocking vs non-blocking) 

1815 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

1816 # is available for read. So we need to do it here 

1817 if self.device.is_blocking: 

1818 self.read = self.raw_read 

1819 else: 

1820 self.read = self.wait_read 

1821 return self.read() 

1822 

1823 

1824class MemorySource(ReentrantOpen): 

1825 def __init__(self, buffer_manager: BufferManager, source: Memory): 

1826 super().__init__() 

1827 self.buffer_manager = buffer_manager 

1828 self.source = source 

1829 self.buffers = None 

1830 self.queue = BufferQueue(buffer_manager, source) 

1831 self.frame_reader = FrameReader(self.device, self.raw_read) 

1832 self.format = None 

1833 

1834 def __iter__(self) -> Iterator[Frame]: 

1835 with self.frame_reader: 

1836 yield from self.frame_reader 

1837 

1838 def __aiter__(self) -> AsyncIterator[Frame]: 

1839 return astream(self.device.fileno(), self.raw_read) 

1840 

1841 @property 

1842 def device(self) -> Device: 

1843 return self.buffer_manager.device 

1844 

1845 def prepare_buffers(self): 

1846 raise NotImplementedError 

1847 

1848 def release_buffers(self): 

1849 self.device.log.info("Freeing buffers...") 

1850 self.buffer_manager.free_buffers(self.source) 

1851 self.buffers = None 

1852 self.format = None 

1853 self.device.log.info("Buffers freed") 

1854 

1855 def open(self) -> None: 

1856 self.format = self.buffer_manager.get_format() 

1857 if self.buffers is None: 1857 ↛ exitline 1857 didn't return from function 'open' because the condition on line 1857 was always true

1858 self.prepare_buffers() 

1859 

1860 def close(self) -> None: 

1861 if self.buffers: 1861 ↛ exitline 1861 didn't return from function 'close' because the condition on line 1861 was always true

1862 self.release_buffers() 

1863 

1864 def grab_from_buffer(self, buff: raw.v4l2_buffer): 

1865 # return memoryview(self.buffers[buff.index])[: buff.bytesused], buff 

1866 return self.buffers[buff.index][: buff.bytesused], buff 

1867 

1868 def raw_grab(self) -> tuple[Buffer, raw.v4l2_buffer]: 

1869 with self.queue as buff: 

1870 return self.grab_from_buffer(buff) 

1871 

1872 def raw_read(self) -> Frame: 

1873 data, buff = self.raw_grab() 

1874 return Frame(data, buff, self.format) 

1875 

1876 def wait_read(self) -> Frame: 

1877 device = self.device 

1878 if device.io.select is not None: 

1879 device.io.select((device,), (), ()) 

1880 return self.raw_read() 

1881 

1882 def read(self) -> Frame: 

1883 # first time we check what mode device was opened (blocking vs non-blocking) 

1884 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

1885 # is available for read. So we need to do it here 

1886 if self.device.is_blocking: 

1887 self.read = self.raw_read 

1888 else: 

1889 self.read = self.wait_read 

1890 return self.read() 

1891 

1892 def raw_write(self, data: Buffer) -> raw.v4l2_buffer: 

1893 with self.queue as buff: 

1894 size = getattr(data, "nbytes", len(data)) 

1895 memory = self.buffers[buff.index] 

1896 memory[:size] = data 

1897 buff.bytesused = size 

1898 return buff 

1899 

1900 def wait_write(self, data: Buffer) -> raw.v4l2_buffer: 

1901 device = self.device 

1902 if device.io.select is not None: 1902 ↛ 1904line 1902 didn't jump to line 1904 because the condition on line 1902 was always true

1903 _, r, _ = device.io.select((), (device,), ()) 

1904 return self.raw_write(data) 

1905 

1906 def write(self, data: Buffer) -> raw.v4l2_buffer: 

1907 # first time we check what mode device was opened (blocking vs non-blocking) 

1908 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

1909 # is available for write. So we need to do it here 

1910 if self.device.is_blocking: 1910 ↛ 1911line 1910 didn't jump to line 1911 because the condition on line 1910 was never true

1911 self.write = self.raw_write 

1912 else: 

1913 self.write = self.wait_write 

1914 return self.write(data) 

1915 

1916 

1917class UserPtr(MemorySource): 

1918 def __init__(self, buffer_manager: BufferManager): 

1919 super().__init__(buffer_manager, Memory.USERPTR) 

1920 

1921 def prepare_buffers(self): 

1922 self.device.log.info("Reserving buffers...") 

1923 self.buffer_manager.create_buffers(self.source) 

1924 size = self.format.size 

1925 self.buffers = [] 

1926 for index in range(self.buffer_manager.size): 

1927 data = ctypes.create_string_buffer(size) 

1928 self.buffers.append(data) 

1929 buff = raw.v4l2_buffer() 

1930 buff.index = index 

1931 buff.type = self.buffer_manager.type 

1932 buff.memory = self.source 

1933 buff.m.userptr = ctypes.addressof(data) 

1934 buff.length = size 

1935 self.queue.enqueue(buff) 

1936 self.device.log.info("Buffers reserved") 

1937 

1938 

1939class MemoryMap(MemorySource): 

1940 def __init__(self, buffer_manager: BufferManager): 

1941 super().__init__(buffer_manager, Memory.MMAP) 

1942 

1943 def prepare_buffers(self): 

1944 self.device.log.info("Reserving buffers...") 

1945 buffers = self.buffer_manager.create_buffers(self.source) 

1946 fd = self.device.fileno() 

1947 self.buffers = [mmap_from_buffer(fd, buff) for buff in buffers] 

1948 self.buffer_manager.enqueue_buffers(Memory.MMAP) 

1949 self.format = self.buffer_manager.get_format() 

1950 self.device.log.info("Buffers reserved") 

1951 

1952 

1953class EventReader: 

1954 def __init__(self, device: Device, max_queue_size=100): 

1955 self.device = device 

1956 self._loop = None 

1957 self._selector = None 

1958 self._buffer = None 

1959 self._max_queue_size = max_queue_size 

1960 

1961 async def __aenter__(self): 

1962 if self.device.is_blocking: 1962 ↛ 1963line 1962 didn't jump to line 1963 because the condition on line 1962 was never true

1963 raise V4L2Error("Cannot use async event reader on blocking device") 

1964 self._buffer = asyncio.Queue(maxsize=self._max_queue_size) 

1965 self._selector = select.epoll() 

1966 self._loop = asyncio.get_event_loop() 

1967 self._loop.add_reader(self._selector.fileno(), self._on_event) 

1968 self._selector.register(self.device.fileno(), select.EPOLLPRI) 

1969 return self 

1970 

1971 async def __aexit__(self, exc_type, exc_value, traceback): 

1972 self._selector.unregister(self.device.fileno()) 

1973 self._loop.remove_reader(self._selector.fileno()) 

1974 self._selector.close() 

1975 self._selector = None 

1976 self._loop = None 

1977 self._buffer = None 

1978 

1979 async def __aiter__(self): 

1980 while True: 

1981 yield await self.aread() 

1982 

1983 def __iter__(self): 

1984 while True: 

1985 yield self.read() 

1986 

1987 def __enter__(self): 

1988 return self 

1989 

1990 def __exit__(self, exc_type, exc_value, tb): 

1991 pass 

1992 

1993 def _on_event(self): 

1994 task = self._loop.create_future() 

1995 try: 

1996 self._selector.poll(0) # avoid blocking 

1997 event = self.device.deque_event() 

1998 task.set_result(event) 

1999 except Exception as error: 

2000 task.set_exception(error) 

2001 

2002 buffer = self._buffer 

2003 if buffer.full(): 2003 ↛ 2004line 2003 didn't jump to line 2004 because the condition on line 2003 was never true

2004 self.device.log.debug("missed event") 

2005 buffer.popleft() 

2006 buffer.put_nowait(task) 

2007 

2008 def read(self, timeout=None): 

2009 if not self.device.is_blocking: 2009 ↛ 2013line 2009 didn't jump to line 2013 because the condition on line 2009 was always true

2010 _, _, exc = self.device.io.select((), (), (self.device,), timeout) 

2011 if not exc: 2011 ↛ 2012line 2011 didn't jump to line 2012 because the condition on line 2011 was never true

2012 return 

2013 return self.device.deque_event() 

2014 

2015 async def aread(self): 

2016 """Wait for next event or return last event in queue""" 

2017 task = await self._buffer.get() 

2018 return await task 

2019 

2020 

2021class FrameReader: 

2022 def __init__(self, device: Device, raw_read: Callable[[], Buffer], max_queue_size: int = 1): 

2023 self.device = device 

2024 self.raw_read = raw_read 

2025 self._loop = None 

2026 self._selector = None 

2027 self._buffer = None 

2028 self._max_queue_size = max_queue_size 

2029 self._device_fd = None 

2030 

2031 async def __aenter__(self) -> Self: 

2032 if self.device.is_blocking: 

2033 raise V4L2Error("Cannot use async frame reader on blocking device") 

2034 self._device_fd = self.device.fileno() 

2035 self._buffer = asyncio.Queue(maxsize=self._max_queue_size) 

2036 self._selector = select.epoll() 

2037 self._loop = asyncio.get_event_loop() 

2038 self._loop.add_reader(self._selector.fileno(), self._on_event) 

2039 self._selector.register(self._device_fd, select.POLLIN) 

2040 return self 

2041 

2042 async def __aexit__(self, exc_type, exc_value, traceback): 

2043 with contextlib.suppress(OSError): 

2044 # device may have been closed by now 

2045 self._selector.unregister(self._device_fd) 

2046 self._loop.remove_reader(self._selector.fileno()) 

2047 self._selector.close() 

2048 self._selector = None 

2049 self._loop = None 

2050 self._buffer = None 

2051 

2052 def __enter__(self) -> Self: 

2053 return self 

2054 

2055 def __exit__(self, exc_type, exc_value, tb): 

2056 pass 

2057 

2058 def __iter__(self): 

2059 while True: 

2060 yield self.read() 

2061 

2062 def _on_event(self) -> None: 

2063 task = self._loop.create_future() 

2064 try: 

2065 self._selector.poll(0) # avoid blocking 

2066 data = self.raw_read() 

2067 task.set_result(data) 

2068 except Exception as error: 

2069 task.set_exception(error) 

2070 

2071 buffer = self._buffer 

2072 if buffer.full(): 

2073 self.device.log.warn("missed frame") 

2074 buffer.get_nowait() 

2075 buffer.put_nowait(task) 

2076 

2077 def read(self, timeout: Optional[float] = None) -> Frame: 

2078 if not self.device.is_blocking: 

2079 read, _, _ = self.device.io.select((self.device,), (), (), timeout) 

2080 if not read: 2080 ↛ 2081line 2080 didn't jump to line 2081 because the condition on line 2080 was never true

2081 return 

2082 return self.raw_read() 

2083 

2084 async def aread(self) -> Frame: 

2085 """Wait for next frame or return last frame""" 

2086 task = await self._buffer.get() 

2087 return await task 

2088 

2089 

2090class BufferQueue: 

2091 def __init__(self, buffer_manager: BufferManager, memory: Memory): 

2092 self.buffer_manager = buffer_manager 

2093 self.memory = memory 

2094 self.raw_buffer = None 

2095 

2096 def enqueue(self, buff: raw.v4l2_buffer): 

2097 enqueue_buffer_raw(self.buffer_manager.device.fileno(), buff) 

2098 

2099 def dequeue(self): 

2100 return self.buffer_manager.dequeue_buffer(self.memory) 

2101 

2102 def __enter__(self) -> raw.v4l2_buffer: 

2103 # get next buffer that has some data in it 

2104 self.raw_buffer = self.dequeue() 

2105 return self.raw_buffer 

2106 

2107 def __exit__(self, *exc): 

2108 # Make a copy of buffer. We need the original buffer that was sent to 

2109 # dequeue in to keep frame info like frame number, timestamp, etc 

2110 raw_buffer = raw.v4l2_buffer() 

2111 memcpy(raw_buffer, self.raw_buffer) 

2112 self.enqueue(raw_buffer) 

2113 

2114 

2115class Write(ReentrantOpen): 

2116 def __init__(self, buffer_manager: BufferManager): 

2117 super().__init__() 

2118 self.buffer_manager = buffer_manager 

2119 

2120 @property 

2121 def device(self) -> Device: 

2122 return self.buffer_manager.device 

2123 

2124 def raw_write(self, data: Buffer) -> None: 

2125 self.device.write(data) 

2126 

2127 def wait_write(self, data: Buffer) -> None: 

2128 device = self.device 

2129 if device.io.select is not None: 

2130 _, w, _ = device.io.select((), (device,), ()) 

2131 if not w: 

2132 raise OSError("Closed") 

2133 self.raw_write(data) 

2134 

2135 def write(self, data: Buffer) -> None: 

2136 # first time we check what mode device was opened (blocking vs non-blocking) 

2137 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer 

2138 # is available for write. So we need to do it here 

2139 if self.device.is_blocking: 

2140 self.write = self.raw_write 

2141 else: 

2142 self.write = self.wait_write 

2143 return self.write(data) 

2144 

2145 def open(self) -> None: 

2146 pass 

2147 

2148 def close(self) -> None: 

2149 pass 

2150 

2151 

2152class VideoOutput(BufferManager): 

2153 def __init__(self, device: Device, size: int = 2, sink: Capability = None): 

2154 super().__init__(device, BufferType.VIDEO_OUTPUT, size) 

2155 self.buffer = None 

2156 self.sink = sink 

2157 

2158 def __enter__(self) -> Self: 

2159 self.open() 

2160 return self 

2161 

2162 def __exit__(self, *exc) -> None: 

2163 self.close() 

2164 

2165 def open(self) -> None: 

2166 if self.buffer is not None: 2166 ↛ 2167line 2166 didn't jump to line 2167 because the condition on line 2166 was never true

2167 return 

2168 self.device.log.info("Preparing for video output...") 

2169 capabilities = self.device.info.capabilities 

2170 # Don't check for output capability. Some drivers (ex: v4l2loopback) don't 

2171 # report being output capable so that apps like zoom recognize them 

2172 # if Capability.VIDEO_OUTPUT not in capabilities: 

2173 # raise V4L2Error("device lacks VIDEO_OUTPUT capability") 

2174 sink = capabilities if self.sink is None else self.sink 

2175 if Capability.STREAMING in sink: 2175 ↛ 2178line 2175 didn't jump to line 2178 because the condition on line 2175 was always true

2176 self.device.log.info("Video output using memory map") 

2177 self.buffer = MemoryMap(self) 

2178 elif Capability.READWRITE in sink: 

2179 self.device.log.info("Video output using write") 

2180 self.buffer = Write(self) 

2181 else: 

2182 raise OSError("Device needs to support STREAMING or READWRITE capability") 

2183 self.buffer.open() 

2184 self.stream_on() 

2185 self.device.log.info("Video output started!") 

2186 

2187 def close(self) -> None: 

2188 if self.buffer: 2188 ↛ exitline 2188 didn't return from function 'close' because the condition on line 2188 was always true

2189 self.device.log.info("Closing video output...") 

2190 try: 

2191 self.stream_off() 

2192 except Exception as error: 

2193 self.device.log.warning("Failed to close stream: %r", error) 

2194 try: 

2195 self.buffer.close() 

2196 except Exception as error: 

2197 self.device.log.warning("Failed to close buffer: %r", error) 

2198 self.buffer = None 

2199 self.device.log.info("Video output closed") 

2200 

2201 def write(self, data: Buffer) -> None: 

2202 self.buffer.write(data) 

2203 

2204 

2205def iter_video_files(path: PathLike = "/dev") -> Iterable[Path]: 

2206 """Returns an iterator over all video files""" 

2207 return iter_device_files(path=path, pattern="video*") 

2208 

2209 

2210def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

2211 """Returns an iterator over all video devices""" 

2212 return (Device(name, **kwargs) for name in iter_video_files(path=path)) 

2213 

2214 

2215def iter_video_capture_files(path: PathLike = "/dev") -> Iterable[Path]: 

2216 """Returns an iterator over all video files that have CAPTURE capability""" 

2217 

2218 def filt(filename): 

2219 with IO.open(filename) as fobj: 

2220 caps = read_capabilities(fobj.fileno()) 

2221 return Capability.VIDEO_CAPTURE in Capability(caps.device_caps) 

2222 

2223 return filter(filt, iter_video_files(path)) 

2224 

2225 

2226def iter_video_capture_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

2227 """Returns an iterator over all video devices that have CAPTURE capability""" 

2228 return (Device(name, **kwargs) for name in iter_video_capture_files(path)) 

2229 

2230 

2231def iter_video_output_files(path: PathLike = "/dev") -> Iterable[Path]: 

2232 """ 

2233 Some drivers (ex: v4l2loopback) don't report being output capable so that 

2234 apps like zoom recognize them as valid capture devices so some results might 

2235 be missing 

2236 """ 

2237 

2238 def filt(filename): 

2239 with IO.open(filename) as fobj: 

2240 caps = read_capabilities(fobj.fileno()) 

2241 return Capability.VIDEO_OUTPUT in Capability(caps.device_caps) 

2242 

2243 return filter(filt, iter_video_files(path)) 

2244 

2245 

2246def iter_video_output_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]: 

2247 """Returns an iterator over all video devices that have VIDEO OUTPUT capability""" 

2248 return (Device(name, **kwargs) for name in iter_video_output_files(path)) 

2249 

2250 

2251find = make_find(iter_devices)