Coverage for linuxpy/video/device.py: 81%
1491 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-18 07:56 +0200
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-18 07:56 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""Human friendly interface to V4L2 (Video 4 Linux 2) subsystem."""
9import asyncio
10import collections
11import contextlib
12import copy
13import ctypes
14import enum
15import errno
16import fractions
17import logging
18import mmap
19import os
20import select
21import time
22from collections import UserDict
23from pathlib import Path
25from linuxpy.ctypes import cast, cenum, create_string_buffer, memcpy, string_at
26from linuxpy.device import (
27 BaseDevice,
28 ReentrantOpen,
29 iter_device_files,
30)
31from linuxpy.io import IO
32from linuxpy.ioctl import ioctl
33from linuxpy.types import AsyncIterator, Buffer, Callable, Iterable, Iterator, Optional, PathLike, Self
34from linuxpy.util import astream, bit_indexes, make_find
36from . import raw
38log = logging.getLogger(__name__)
39log_mmap = log.getChild("mmap")
42class V4L2Error(Exception):
43 """Video for linux 2 error"""
46def _enum(name, prefix, klass=enum.IntEnum):
47 return klass(
48 name,
49 ((name.replace(prefix, ""), getattr(raw, name)) for name in dir(raw) if name.startswith(prefix)),
50 )
53FrameSizeType = raw.Frmsizetypes
54FrameIntervalType = raw.Frmivaltypes
55Field = raw.Field
56ImageFormatFlag = raw.ImageFormatFlag
57Capability = raw.Capability
58ControlID = raw.ControlID
59ControlFlag = raw.ControlFlag
60ControlType = raw.CtrlType
61ControlClass = raw.ControlClass
62SelectionTarget = raw.SelectionTarget
63EventType = raw.EventType
64EventControlChange = raw.EventControlChange
65IOC = raw.IOC
66BufferType = raw.BufType
67BufferFlag = raw.BufferFlag
68InputType = raw.InputType
69PixelFormat = raw.PixelFormat
70MetaFormat = raw.MetaFormat
71FrameSizeType = raw.Frmsizetypes
72Memory = raw.Memory
73InputStatus = raw.InputStatus
74OutputType = raw.OutputType
75InputCapabilities = raw.InputCapabilities
76OutputCapabilities = raw.OutputCapabilities
77Priority = raw.Priority
78TimeCodeType = raw.TimeCodeType
79TimeCodeFlag = raw.TimeCodeFlag
80EventSubscriptionFlag = raw.EventSubscriptionFlag
81StandardID = raw.StandardID
84def V4L2_CTRL_ID2CLASS(id_):
85 return id_ & 0x0FFF0000 # unsigned long
88def human_pixel_format(ifmt):
89 return "".join(map(chr, ((ifmt >> i) & 0xFF for i in range(0, 4 * 8, 8))))
92PixelFormat.human_str = lambda self: human_pixel_format(self.value) 92 ↛ exitline 92 didn't run the lambda on line 92
93MetaFormat.human_str = lambda self: human_pixel_format(self.value) 93 ↛ exitline 93 didn't run the lambda on line 93
96ImageFormat = collections.namedtuple("ImageFormat", "type description flags pixel_format")
98MetaFmt = collections.namedtuple("MetaFmt", "format max_buffer_size width height bytes_per_line")
100Format = collections.namedtuple("Format", "width height pixel_format size")
102CropCapability = collections.namedtuple("CropCapability", "type bounds defrect pixel_aspect")
104Rect = collections.namedtuple("Rect", "left top width height")
106Size = collections.namedtuple("Size", "width height")
108FrameType = collections.namedtuple("FrameType", "type pixel_format width height min_fps max_fps step_fps")
110Input = collections.namedtuple("InputType", "index name type audioset tuner std status capabilities")
112Output = collections.namedtuple("OutputType", "index name type audioset modulator std capabilities")
114Standard = collections.namedtuple("Standard", "index id name frameperiod framelines")
117CROP_BUFFER_TYPES = {
118 BufferType.VIDEO_CAPTURE,
119 BufferType.VIDEO_CAPTURE_MPLANE,
120 BufferType.VIDEO_OUTPUT,
121 BufferType.VIDEO_OUTPUT_MPLANE,
122 BufferType.VIDEO_OVERLAY,
123}
125IMAGE_FORMAT_BUFFER_TYPES = {
126 BufferType.VIDEO_CAPTURE,
127 BufferType.VIDEO_CAPTURE_MPLANE,
128 BufferType.VIDEO_OUTPUT,
129 BufferType.VIDEO_OUTPUT_MPLANE,
130 BufferType.VIDEO_OVERLAY,
131 BufferType.META_CAPTURE,
132 BufferType.META_OUTPUT,
133}
136def mem_map(fd, length, offset):
137 log_mmap.debug("%s, length=%d, offset=%d", fd, length, offset)
138 return mmap.mmap(fd, length, offset=offset)
141def flag_items(flag):
142 return [item for item in type(flag) if item in flag]
145def raw_crop_caps_to_crop_caps(crop):
146 return CropCapability(
147 type=BufferType(crop.type),
148 bounds=Rect(
149 crop.bounds.left,
150 crop.bounds.top,
151 crop.bounds.width,
152 crop.bounds.height,
153 ),
154 defrect=Rect(
155 crop.defrect.left,
156 crop.defrect.top,
157 crop.defrect.width,
158 crop.defrect.height,
159 ),
160 pixel_aspect=crop.pixelaspect.numerator / crop.pixelaspect.denominator,
161 )
164CropCapability.from_raw = raw_crop_caps_to_crop_caps
167def raw_read_crop_capabilities(fd, buffer_type: BufferType) -> raw.v4l2_cropcap:
168 crop = raw.v4l2_cropcap()
169 crop.type = buffer_type
170 return ioctl(fd, IOC.CROPCAP, crop)
173def read_crop_capabilities(fd, buffer_type: BufferType) -> CropCapability:
174 try:
175 crop = raw_read_crop_capabilities(fd, buffer_type)
176 except OSError as error:
177 if error.errno == errno.ENODATA: 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true
178 return None
179 raise
180 return raw_crop_caps_to_crop_caps(crop)
183ITER_BREAK = (errno.ENOTTY, errno.ENODATA, errno.EPIPE)
186def iter_read(fd, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval=False):
187 for index in range(start, stop, step):
188 indexed_struct.index = index
189 try:
190 ioctl(fd, ioc, indexed_struct)
191 yield indexed_struct
192 except OSError as error:
193 if error.errno == errno.EINVAL:
194 if ignore_einval:
195 continue
196 else:
197 break
198 elif error.errno in ITER_BREAK: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true
199 break
200 else:
201 raise
204def iter_read_frame_intervals(fd, fmt, w, h):
205 value = raw.v4l2_frmivalenum()
206 value.pixel_format = fmt
207 value.width = w
208 value.height = h
209 count = 0
210 for val in iter_read(fd, IOC.ENUM_FRAMEINTERVALS, value):
211 # values come in frame interval (fps = 1/interval)
212 try:
213 ftype = FrameIntervalType(val.type)
214 except ValueError:
215 break
216 if ftype == FrameIntervalType.DISCRETE: 216 ↛ 219line 216 didn't jump to line 219 because the condition on line 216 was always true
217 min_fps = max_fps = step_fps = fractions.Fraction(val.discrete.denominator / val.discrete.numerator)
218 else:
219 if val.stepwise.min.numerator == 0:
220 min_fps = 0
221 else:
222 min_fps = fractions.Fraction(val.stepwise.min.denominator, val.stepwise.min.numerator)
223 if val.stepwise.max.numerator == 0:
224 max_fps = 0
225 else:
226 max_fps = fractions.Fraction(val.stepwise.max.denominator, val.stepwise.max.numerator)
227 if val.stepwise.step.numerator == 0:
228 step_fps = 0
229 else:
230 step_fps = fractions.Fraction(val.stepwise.step.denominator, val.stepwise.step.numerator)
231 yield FrameType(
232 type=ftype,
233 pixel_format=fmt,
234 width=w,
235 height=h,
236 min_fps=min_fps,
237 max_fps=max_fps,
238 step_fps=step_fps,
239 )
240 count += 1
241 if not count: 241 ↛ 243line 241 didn't jump to line 243 because the condition on line 241 was never true
242 # If it wasn't possible to get frame interval, report discovered frame size anyway
243 yield FrameType(
244 type=FrameIntervalType.DISCRETE,
245 pixel_format=fmt,
246 width=w,
247 height=h,
248 min_fps=0,
249 max_fps=0,
250 step_fps=0,
251 )
254def iter_read_discrete_frame_sizes(fd, pixel_format):
255 size = raw.v4l2_frmsizeenum()
256 size.index = 0
257 size.pixel_format = pixel_format
258 for val in iter_read(fd, IOC.ENUM_FRAMESIZES, size):
259 if size.type != FrameSizeType.DISCRETE: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 break
261 yield val
264def iter_read_pixel_formats_frame_intervals(fd, pixel_formats):
265 for pixel_format in pixel_formats:
266 for size in iter_read_discrete_frame_sizes(fd, pixel_format):
267 yield from iter_read_frame_intervals(fd, pixel_format, size.discrete.width, size.discrete.height)
270def read_capabilities(fd):
271 caps = raw.v4l2_capability()
272 ioctl(fd, IOC.QUERYCAP, caps)
273 return caps
276def iter_read_formats(fd, type):
277 format = raw.v4l2_fmtdesc()
278 format.type = type
279 pixel_formats = set(PixelFormat)
280 meta_formats = set(MetaFormat)
281 for fmt in iter_read(fd, IOC.ENUM_FMT, format):
282 pixel_fmt = fmt.pixelformat
283 if type in {BufferType.VIDEO_CAPTURE, BufferType.VIDEO_OUTPUT}:
284 if pixel_fmt not in pixel_formats: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 log.warning(
286 "ignored unknown pixel format %s (%d)",
287 human_pixel_format(pixel_fmt),
288 pixel_fmt,
289 )
290 continue
291 pixel_format = PixelFormat(pixel_fmt)
292 elif type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}: 292 ↛ 301line 292 didn't jump to line 301 because the condition on line 292 was always true
293 if pixel_fmt not in meta_formats: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 log.warning(
295 "ignored unknown meta format %s (%d)",
296 human_pixel_format(pixel_fmt),
297 pixel_fmt,
298 )
299 continue
300 pixel_format = MetaFormat(pixel_fmt)
301 image_format = ImageFormat(
302 type=type,
303 flags=ImageFormatFlag(fmt.flags),
304 description=fmt.description.decode(),
305 pixel_format=pixel_format,
306 )
307 yield image_format
310def iter_read_inputs(fd):
311 input = raw.v4l2_input()
312 for inp in iter_read(fd, IOC.ENUMINPUT, input):
313 input_type = Input(
314 index=inp.index,
315 name=inp.name.decode(),
316 type=InputType(inp.type),
317 audioset=bit_indexes(inp.audioset),
318 tuner=inp.tuner,
319 std=StandardID(inp.std),
320 status=InputStatus(inp.status),
321 capabilities=InputCapabilities(inp.capabilities),
322 )
323 yield input_type
326def iter_read_outputs(fd):
327 output = raw.v4l2_output()
328 for out in iter_read(fd, IOC.ENUMOUTPUT, output):
329 output_type = Output(
330 index=out.index,
331 name=out.name.decode(),
332 type=OutputType(out.type),
333 audioset=bit_indexes(out.audioset),
334 modulator=out.modulator,
335 std=StandardID(out.std),
336 capabilities=OutputCapabilities(out.capabilities),
337 )
338 yield output_type
341def iter_read_video_standards(fd):
342 std = raw.v4l2_standard()
343 for item in iter_read(fd, IOC.ENUMSTD, std):
344 period = item.frameperiod
345 yield Standard(
346 index=item.index,
347 id=StandardID(item.id),
348 name=item.name.decode(),
349 frameperiod=fractions.Fraction(period.denominator, period.numerator),
350 framelines=item.framelines,
351 )
354def iter_read_controls(fd):
355 ctrl = raw.v4l2_query_ext_ctrl()
356 nxt = ControlFlag.NEXT_CTRL | ControlFlag.NEXT_COMPOUND
357 ctrl.id = nxt
358 for ctrl_ext in iter_read(fd, IOC.QUERY_EXT_CTRL, ctrl):
359 yield copy.deepcopy(ctrl_ext)
360 ctrl_ext.id |= nxt
363def iter_read_menu(fd, ctrl):
364 qmenu = raw.v4l2_querymenu()
365 qmenu.id = ctrl.id
366 for menu in iter_read(
367 fd,
368 IOC.QUERYMENU,
369 qmenu,
370 start=ctrl._info.minimum,
371 stop=ctrl._info.maximum + 1,
372 step=ctrl._info.step,
373 ignore_einval=True,
374 ):
375 yield copy.deepcopy(menu)
378def query_buffer(fd, buffer_type: BufferType, memory: Memory, index: int) -> raw.v4l2_buffer:
379 buff = raw.v4l2_buffer()
380 buff.type = buffer_type
381 buff.memory = memory
382 buff.index = index
383 buff.reserved = 0
384 ioctl(fd, IOC.QUERYBUF, buff)
385 return buff
388def enqueue_buffer_raw(fd, buff: raw.v4l2_buffer) -> raw.v4l2_buffer:
389 if not buff.timestamp.secs:
390 buff.timestamp.set_ns()
391 ioctl(fd, IOC.QBUF, buff)
392 return buff
395def enqueue_buffer(fd, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
396 buff = raw.v4l2_buffer()
397 buff.type = buffer_type
398 buff.memory = memory
399 buff.bytesused = size
400 buff.index = index
401 buff.field = Field.NONE
402 buff.reserved = 0
403 return enqueue_buffer_raw(fd, buff)
406def dequeue_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
407 buff = raw.v4l2_buffer()
408 buff.type = buffer_type
409 buff.memory = memory
410 buff.index = 0
411 buff.reserved = 0
412 ioctl(fd, IOC.DQBUF, buff)
413 return buff
416def request_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> raw.v4l2_requestbuffers:
417 req = raw.v4l2_requestbuffers()
418 req.type = buffer_type
419 req.memory = memory
420 req.count = count
421 ioctl(fd, IOC.REQBUFS, req)
422 if not req.count: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 raise OSError("Not enough buffer memory")
424 return req
427def free_buffers(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_requestbuffers:
428 req = raw.v4l2_requestbuffers()
429 req.type = buffer_type
430 req.memory = memory
431 req.count = 0
432 ioctl(fd, IOC.REQBUFS, req)
433 return req
436def export_buffer(fd, buffer_type: BufferType, index: int) -> int:
437 req = raw.v4l2_exportbuffer(type=buffer_type, index=index)
438 return ioctl(fd, IOC.EXPBUF, req).fd
441def create_buffers(fd, format: raw.v4l2_format, memory: Memory, count: int) -> raw.v4l2_create_buffers:
442 """Create buffers for Memory Mapped or User Pointer or DMA Buffer I/O"""
443 req = raw.v4l2_create_buffers()
444 req.format = format
445 req.memory = memory
446 req.count = count
447 ioctl(fd, IOC.CREATE_BUFS, req)
448 if not req.count:
449 raise OSError("Not enough buffer memory")
450 return req
453def set_raw_format(fd, fmt: raw.v4l2_format):
454 return ioctl(fd, IOC.S_FMT, fmt)
457def set_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"):
458 fmt = raw.v4l2_format()
459 if isinstance(pixel_format, str):
460 pixel_format = raw.v4l2_fourcc(*pixel_format)
461 fmt.type = buffer_type
462 fmt.fmt.pix.pixelformat = pixel_format
463 fmt.fmt.pix.field = Field.ANY
464 fmt.fmt.pix.width = width
465 fmt.fmt.pix.height = height
466 fmt.fmt.pix.bytesperline = 0
467 fmt.fmt.pix.sizeimage = 0
468 return set_raw_format(fd, fmt)
471def get_raw_format(fd, buffer_type) -> raw.v4l2_format:
472 fmt = raw.v4l2_format()
473 fmt.type = buffer_type
474 ioctl(fd, IOC.G_FMT, fmt)
475 return fmt
478def get_format(fd, buffer_type) -> Format:
479 f = get_raw_format(fd, buffer_type)
480 if buffer_type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}:
481 return MetaFmt(
482 format=MetaFormat(f.fmt.meta.dataformat),
483 max_buffer_size=f.fmt.meta.buffersize,
484 width=f.fmt.meta.width,
485 height=f.fmt.meta.height,
486 bytes_per_line=f.fmt.meta.bytesperline,
487 )
488 return Format(
489 width=f.fmt.pix.width,
490 height=f.fmt.pix.height,
491 pixel_format=PixelFormat(f.fmt.pix.pixelformat),
492 size=f.fmt.pix.sizeimage,
493 )
496def try_raw_format(fd, fmt: raw.v4l2_format):
497 ioctl(fd, IOC.TRY_FMT, fmt)
500def try_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"):
501 fmt = raw.v4l2_format()
502 if isinstance(pixel_format, str):
503 pixel_format = raw.v4l2_fourcc(*pixel_format)
504 fmt.type = buffer_type
505 fmt.fmt.pix.pixelformat = pixel_format
506 fmt.fmt.pix.field = Field.ANY
507 fmt.fmt.pix.width = width
508 fmt.fmt.pix.height = height
509 fmt.fmt.pix.bytesperline = 0
510 fmt.fmt.pix.sizeimage = 0
511 return try_raw_format(fd, fmt)
514def get_parm(fd, buffer_type):
515 p = raw.v4l2_streamparm()
516 p.type = buffer_type
517 ioctl(fd, IOC.G_PARM, p)
518 return p
521def set_fps(fd, buffer_type, fps):
522 # v4l2 fraction is u32
523 max_denominator = int(min(2**32, 2**32 / fps))
524 p = raw.v4l2_streamparm()
525 p.type = buffer_type
526 fps = fractions.Fraction(fps).limit_denominator(max_denominator)
527 if buffer_type == BufferType.VIDEO_CAPTURE:
528 p.parm.capture.timeperframe.numerator = fps.denominator
529 p.parm.capture.timeperframe.denominator = fps.numerator
530 elif buffer_type == BufferType.VIDEO_OUTPUT:
531 p.parm.output.timeperframe.numerator = fps.denominator
532 p.parm.output.timeperframe.denominator = fps.numerator
533 else:
534 raise ValueError(f"Unsupported buffer type {buffer_type!r}")
535 return ioctl(fd, IOC.S_PARM, p)
538def get_fps(fd, buffer_type):
539 p = get_parm(fd, buffer_type)
540 if buffer_type == BufferType.VIDEO_CAPTURE:
541 parm = p.parm.capture
542 elif buffer_type == BufferType.VIDEO_OUTPUT:
543 parm = p.parm.output
544 else:
545 raise ValueError(f"Unsupported buffer type {buffer_type!r}")
546 return fractions.Fraction(parm.timeperframe.denominator, parm.timeperframe.numerator)
549def stream_on(fd, buffer_type):
550 btype = cenum(buffer_type)
551 return ioctl(fd, IOC.STREAMON, btype)
554def stream_off(fd, buffer_type):
555 btype = cenum(buffer_type)
556 return ioctl(fd, IOC.STREAMOFF, btype)
559def set_selection(fd, buffer_type, target, rectangle):
560 sel = raw.v4l2_selection()
561 sel.type = buffer_type
562 sel.target = target
563 sel.r.left = rectangle.left
564 sel.r.top = rectangle.top
565 sel.r.width = rectangle.width
566 sel.r.height = rectangle.height
567 ioctl(fd, IOC.S_SELECTION, sel)
570def get_selection(
571 fd,
572 buffer_type: BufferType,
573 target: SelectionTarget = SelectionTarget.CROP,
574) -> Rect:
575 sel = raw.v4l2_selection()
576 sel.type = buffer_type
577 sel.target = target
578 ioctl(fd, IOC.G_SELECTION, sel)
579 return Rect(left=sel.r.left, top=sel.r.top, width=sel.r.width, height=sel.r.height)
582def get_control(fd, id):
583 control = raw.v4l2_control(id)
584 ioctl(fd, IOC.G_CTRL, control)
585 return control.value
588CTRL_TYPE_CTYPE_ARRAY = {
589 ControlType.U8: ctypes.c_uint8,
590 ControlType.U16: ctypes.c_uint16,
591 ControlType.U32: ctypes.c_uint32,
592 ControlType.INTEGER: ctypes.c_int,
593 ControlType.INTEGER64: ctypes.c_int64,
594}
597CTRL_TYPE_CTYPE_STRUCT = {
598 # ControlType.AREA: raw.v4l2_area,
599}
602def _struct_for_ctrl_type(ctrl_type):
603 ctrl_type = ControlType(ctrl_type).name.lower()
604 name = f"v4l2_ctrl_{ctrl_type}"
605 try:
606 return getattr(raw, name)
607 except AttributeError:
608 name = f"v4l2_{ctrl_type}"
609 return getattr(raw, name)
612def get_ctrl_type_struct(ctrl_type):
613 struct = CTRL_TYPE_CTYPE_STRUCT.get(ctrl_type)
614 if struct is None:
615 struct = _struct_for_ctrl_type(ctrl_type)
616 CTRL_TYPE_CTYPE_STRUCT[ctrl_type] = struct
617 return struct
620def convert_to_ctypes_array(lst, depth, ctype):
621 """Convert a list (arbitrary depth) to a ctypes array."""
622 if depth == 1:
623 return (ctype * len(lst))(*lst)
625 # Recursive case: we need to process the sub-lists first
626 sub_arrays = [convert_to_ctypes_array(sub_lst, depth - 1, ctype) for sub_lst in lst]
627 array_type = len(sub_arrays) * type(sub_arrays[0]) # Create the array type
628 return array_type(*sub_arrays)
631def _prepare_read_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control):
632 raw_control.id = control.id
633 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags)
634 if has_payload:
635 if control.type == ControlType.STRING:
636 size = control.maximum + 1
637 payload = ctypes.create_string_buffer(size)
638 raw_control.string = payload
639 raw_control.size = size
640 else:
641 ctype = CTRL_TYPE_CTYPE_ARRAY.get(control.type)
642 raw_control.size = control.elem_size * control.elems
643 if ctype is None:
644 ctype = get_ctrl_type_struct(control.type)
645 payload = ctype()
646 raw_control.ptr = ctypes.cast(ctypes.pointer(payload), ctypes.c_void_p)
647 else:
648 for i in range(control.nr_of_dims):
649 ctype *= control.dims[i]
650 payload = ctype()
651 raw_control.size = control.elem_size * control.elems
652 raw_control.ptr = ctypes.cast(payload, ctypes.c_void_p)
653 return payload
656def _get_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control, data):
657 if data is None:
658 if control.type == ControlType.INTEGER64:
659 return raw_control.value64
660 return raw_control.value
661 else:
662 if control.type == ControlType.STRING:
663 return data.value.decode()
664 return data
667def get_controls_values(fd, controls: list[raw.v4l2_query_ext_ctrl], which=raw.ControlWhichValue.CUR_VAL, request_fd=0):
668 n = len(controls)
669 ctrls = raw.v4l2_ext_controls()
670 ctrls.which = which
671 ctrls.count = n
672 ctrls.request_fd = request_fd
673 ctrls.controls = (n * raw.v4l2_ext_control)()
674 values = [_prepare_read_control_value(*args) for args in zip(controls, ctrls.controls)]
675 ioctl(fd, IOC.G_EXT_CTRLS, ctrls)
676 return [_get_control_value(*args) for args in zip(controls, ctrls.controls, values)]
679def set_control(fd, id, value):
680 control = raw.v4l2_control(id, value)
681 ioctl(fd, IOC.S_CTRL, control)
684def _prepare_write_controls_values(control: raw.v4l2_query_ext_ctrl, value: object, raw_control: raw.v4l2_ext_control):
685 raw_control.id = control.id
686 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags)
687 if has_payload:
688 if control.type == ControlType.STRING:
689 raw_control.string = ctypes.create_string_buffer(value.encode())
690 raw_control.size = len(value) + 1
691 else:
692 array_type = CTRL_TYPE_CTYPE_ARRAY.get(control.type)
693 raw_control.size = control.elem_size * control.elems
694 # a struct: assume value is proper raw struct
695 if array_type is None:
696 value = ctypes.pointer(value)
697 else:
698 value = convert_to_ctypes_array(value, control.nr_of_dims, array_type)
699 ptr = ctypes.cast(value, ctypes.c_void_p)
700 raw_control.ptr = ptr
701 else:
702 if control.type == ControlType.INTEGER64:
703 raw_control.value64 = value
704 else:
705 raw_control.value = value
708def set_controls_values(
709 fd, controls_values: list[tuple[raw.v4l2_query_ext_ctrl, object]], which=raw.ControlWhichValue.CUR_VAL, request_fd=0
710):
711 n = len(controls_values)
712 ctrls = raw.v4l2_ext_controls()
713 ctrls.which = which
714 ctrls.count = n
715 ctrls.request_fd = request_fd
716 ctrls.controls = (n * raw.v4l2_ext_control)()
717 for (control, value), raw_control in zip(controls_values, ctrls.controls):
718 _prepare_write_controls_values(control, value, raw_control)
719 ioctl(fd, IOC.S_EXT_CTRLS, ctrls)
722def get_priority(fd) -> Priority:
723 priority = ctypes.c_uint()
724 ioctl(fd, IOC.G_PRIORITY, priority)
725 return Priority(priority.value)
728def set_priority(fd, priority: Priority):
729 priority = ctypes.c_uint(priority.value)
730 ioctl(fd, IOC.S_PRIORITY, priority)
733def subscribe_event(
734 fd,
735 event_type: EventType,
736 id: int = 0,
737 flags: EventSubscriptionFlag = 0,
738):
739 sub = raw.v4l2_event_subscription()
740 sub.type = event_type
741 sub.id = id
742 sub.flags = flags
743 ioctl(fd, IOC.SUBSCRIBE_EVENT, sub)
746def unsubscribe_event(fd, event_type: EventType = EventType.ALL, id: int = 0):
747 sub = raw.v4l2_event_subscription()
748 sub.type = event_type
749 sub.id = id
750 ioctl(fd, IOC.UNSUBSCRIBE_EVENT, sub)
753def deque_event(fd) -> raw.v4l2_event:
754 event = raw.v4l2_event()
755 return ioctl(fd, IOC.DQEVENT, event)
758def set_edid(fd, edid):
759 if len(edid) % 128:
760 raise ValueError(f"EDID length {len(edid)} is not multiple of 128")
761 edid_struct = raw.v4l2_edid()
762 edid_struct.pad = 0
763 edid_struct.start_block = 0
764 edid_struct.blocks = len(edid) // 128
765 edid_array = create_string_buffer(edid)
766 edid_struct.edid = cast(edid_array, type(edid_struct.edid))
767 ioctl(fd, IOC.S_EDID, edid_struct)
770def clear_edid(fd):
771 set_edid(fd, b"")
774def get_edid(fd):
775 edid_struct = raw.v4l2_edid()
776 ioctl(fd, IOC.G_EDID, edid_struct)
777 if edid_struct.blocks == 0:
778 return b""
779 edid_len = 128 * edid_struct.blocks
780 edid_array = create_string_buffer(b"\0" * edid_len)
781 edid_struct.edid = cast(edid_array, type(edid_struct.edid))
782 ioctl(fd, IOC.G_EDID, edid_struct)
783 return string_at(edid_struct.edid, edid_len)
786def get_input(fd):
787 inp = ctypes.c_uint()
788 ioctl(fd, IOC.G_INPUT, inp)
789 return inp.value
792def set_input(fd, index: int):
793 index = ctypes.c_uint(index)
794 ioctl(fd, IOC.S_INPUT, index)
797def get_output(fd):
798 out = ctypes.c_uint()
799 ioctl(fd, IOC.G_OUTPUT, out)
800 return out.value
803def set_output(fd, index: int):
804 index = ctypes.c_uint(index)
805 ioctl(fd, IOC.S_OUTPUT, index)
808def get_std(fd) -> StandardID:
809 out = ctypes.c_uint64()
810 ioctl(fd, IOC.G_STD, out)
811 return StandardID(out.value)
814def set_std(fd, std):
815 ioctl(fd, IOC.S_STD, std)
818def query_std(fd) -> StandardID:
819 out = ctypes.c_uint64()
820 ioctl(fd, IOC.QUERYSTD, out)
821 return StandardID(out.value)
824SubdevFormat = collections.namedtuple(
825 "SubdevFormat", "pad which width height code field colorspace quantization xfer_func flags stream"
826)
829def _translate_subdev_format(fmt: raw.v4l2_subdev_format):
830 return SubdevFormat(
831 pad=fmt.pad,
832 which=raw.SubdevFormatWhence(fmt.which),
833 width=fmt.format.width,
834 height=fmt.format.height,
835 code=raw.MbusPixelcode(fmt.format.code),
836 field=raw.Field(fmt.format.field),
837 colorspace=raw.Colorspace(fmt.format.colorspace),
838 quantization=raw.Quantization(fmt.format.quantization),
839 xfer_func=raw.XferFunc(fmt.format.xfer_func),
840 flags=raw.MbusFrameFormatFlag(fmt.format.flags),
841 stream=fmt.stream,
842 )
845def get_subdevice_format(fd, pad: int = 0) -> raw.v4l2_subdev_format:
846 fmt = raw.v4l2_subdev_format(pad=pad, which=raw.SubdevFormatWhence.ACTIVE)
847 return _translate_subdev_format(ioctl(fd, IOC.SUBDEV_G_FMT, fmt))
850# Helpers
853def request_and_query_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
854 """request + query buffers"""
855 buffers = request_and_query_buffers(fd, buffer_type, memory, 1)
856 return buffers[0]
859def request_and_query_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
860 """request + query buffers"""
861 request_buffers(fd, buffer_type, memory, count)
862 return [query_buffer(fd, buffer_type, memory, index) for index in range(count)]
865def mmap_from_buffer(fd, buff: raw.v4l2_buffer) -> mmap.mmap:
866 return mem_map(fd, buff.length, offset=buff.m.offset)
869def create_mmap_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[mmap.mmap]:
870 """create buffers + mmap_from_buffer"""
871 return [mmap_from_buffer(fd, buff) for buff in request_and_query_buffers(fd, buffer_type, memory, count)]
874def create_mmap_buffer(fd, buffer_type: BufferType, memory: Memory) -> mmap.mmap:
875 return create_mmap_buffers(fd, buffer_type, memory, 1)
878def enqueue_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
879 return [enqueue_buffer(fd, buffer_type, memory, 0, index) for index in range(count)]
882class Device(BaseDevice):
883 PREFIX = "/dev/video"
885 def __init__(self, name_or_file, read_write=True, io=IO):
886 self.info = None
887 self.controls = None
888 super().__init__(name_or_file, read_write=read_write, io=io)
890 def __iter__(self):
891 with VideoCapture(self) as stream:
892 yield from stream
894 async def __aiter__(self):
895 with VideoCapture(self) as stream:
896 async for frame in stream:
897 yield frame
899 def _on_open(self):
900 self.info = InfoEx(self)
901 self.controls = Controls.from_device(self)
903 def query_buffer(self, buffer_type, memory, index):
904 return query_buffer(self.fileno(), buffer_type, memory, index)
906 def enqueue_buffer(self, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
907 return enqueue_buffer(self.fileno(), buffer_type, memory, size, index)
909 def dequeue_buffer(self, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
910 return dequeue_buffer(self.fileno(), buffer_type, memory)
912 def request_buffers(self, buffer_type, memory, size):
913 return request_buffers(self.fileno(), buffer_type, memory, size)
915 def create_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
916 return request_and_query_buffers(self.fileno(), buffer_type, memory, count)
918 def free_buffers(self, buffer_type, memory):
919 return free_buffers(self.fileno(), buffer_type, memory)
921 def enqueue_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
922 return enqueue_buffers(self.fileno(), buffer_type, memory, count)
924 def set_format(
925 self,
926 buffer_type: BufferType,
927 width: int,
928 height: int,
929 pixel_format: str = "MJPG",
930 ):
931 return set_format(self.fileno(), buffer_type, width, height, pixel_format=pixel_format)
933 def get_format(self, buffer_type) -> Format:
934 return get_format(self.fileno(), buffer_type)
936 def set_fps(self, buffer_type, fps):
937 return set_fps(self.fileno(), buffer_type, fps)
939 def get_fps(self, buffer_type):
940 return get_fps(self.fileno(), buffer_type)
942 def set_selection(self, buffer_type, target, rectangle):
943 return set_selection(self.fileno(), buffer_type, target, rectangle)
945 def get_selection(self, buffer_type, target):
946 return get_selection(self.fileno(), buffer_type, target)
948 def get_priority(self) -> Priority:
949 return get_priority(self.fileno())
951 def set_priority(self, priority: Priority):
952 set_priority(self.fileno(), priority)
954 def stream_on(self, buffer_type):
955 self.log.info("Starting %r stream...", buffer_type.name)
956 stream_on(self.fileno(), buffer_type)
957 self.log.info("%r stream ON", buffer_type.name)
959 def stream_off(self, buffer_type):
960 self.log.info("Stoping %r stream...", buffer_type.name)
961 stream_off(self.fileno(), buffer_type)
962 self.log.info("%r stream OFF", buffer_type.name)
964 def write(self, data: bytes) -> None:
965 self._fobj.write(data)
967 def subscribe_event(
968 self,
969 event_type: EventType = EventType.ALL,
970 id: int = 0,
971 flags: EventSubscriptionFlag = 0,
972 ):
973 return subscribe_event(self.fileno(), event_type, id, flags)
975 def unsubscribe_event(self, event_type: EventType = EventType.ALL, id: int = 0):
976 return unsubscribe_event(self.fileno(), event_type, id)
978 def deque_event(self):
979 return deque_event(self.fileno())
981 def set_edid(self, edid):
982 set_edid(self.fileno(), edid)
984 def clear_edid(self):
985 clear_edid(self.fileno())
987 def get_edid(self):
988 return get_edid(self.fileno())
990 def get_input(self):
991 return get_input(self.fileno())
993 def set_input(self, index: int):
994 return set_input(self.fileno(), index)
996 def get_output(self):
997 return get_output(self.fileno())
999 def set_output(self, index: int):
1000 return set_output(self.fileno(), index)
1002 def get_std(self) -> StandardID:
1003 return get_std(self.fileno())
1005 def set_std(self, std):
1006 return set_std(self.fileno(), std)
1008 def query_std(self) -> StandardID:
1009 return query_std(self.fileno())
1012class SubDevice(BaseDevice):
1013 def get_format(self, pad: int = 0) -> SubdevFormat:
1014 return get_subdevice_format(self, pad=pad)
1017def create_artificial_control_class(class_id):
1018 return raw.v4l2_query_ext_ctrl(
1019 id=class_id | 1,
1020 name=b"Generic Controls",
1021 type=ControlType.CTRL_CLASS,
1022 )
1025class Controls(dict):
1026 def __init__(self, device: Device):
1027 super().__init__()
1028 self.__dict__["_device"] = device
1029 self.__dict__["_initialized"] = False
1031 def _init_if_needed(self):
1032 if not self._initialized:
1033 self._load()
1034 self.__dict__["_initialized"] = True
1036 def __getitem__(self, name):
1037 self._init_if_needed()
1038 return super().__getitem__(name)
1040 def __len__(self):
1041 self._init_if_needed()
1042 return super().__len__()
1044 def _load(self):
1045 ctrl_type_map = {
1046 ControlType.BOOLEAN: BooleanControl,
1047 ControlType.INTEGER: IntegerControl,
1048 ControlType.INTEGER64: Integer64Control,
1049 ControlType.MENU: MenuControl,
1050 ControlType.INTEGER_MENU: MenuControl,
1051 ControlType.U8: U8Control,
1052 ControlType.U16: U16Control,
1053 ControlType.U32: U32Control,
1054 ControlType.BUTTON: ButtonControl,
1055 }
1056 classes = {}
1057 for ctrl in self._device.info.controls:
1058 ctrl_type = ControlType(ctrl.type)
1059 ctrl_class_id = V4L2_CTRL_ID2CLASS(ctrl.id)
1060 if ctrl_type == ControlType.CTRL_CLASS:
1061 classes[ctrl_class_id] = ctrl
1062 else:
1063 klass = classes.get(ctrl_class_id)
1064 if klass is None:
1065 klass = create_artificial_control_class(ctrl_class_id)
1066 classes[ctrl_class_id] = klass
1067 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(ctrl.flags)
1068 if has_payload:
1069 ctrl_class = CompoundControl
1070 else:
1071 ctrl_class = ctrl_type_map.get(ctrl_type, GenericControl)
1072 self[ctrl.id] = ctrl_class(self._device, ctrl, klass)
1074 @classmethod
1075 def from_device(cls, device):
1076 """Deprecated: backward compatible. Please use Controls(device) constructor directly"""
1077 return cls(device)
1079 def __getattr__(self, key):
1080 with contextlib.suppress(KeyError):
1081 return self[key]
1082 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'")
1084 def __setattr__(self, key, value):
1085 self._init_if_needed()
1086 self[key] = value
1088 def __missing__(self, key):
1089 self._init_if_needed()
1090 for v in self.values():
1091 if isinstance(v, BaseControl) and (v.config_name == key):
1092 return v
1093 raise KeyError(key)
1095 def values(self):
1096 self._init_if_needed()
1097 return super().values()
1099 def used_classes(self):
1100 class_map = {v.control_class.id: v.control_class for v in self.values() if isinstance(v, BaseControl)}
1101 return list(class_map.values())
1103 def with_class(self, control_class):
1104 if isinstance(control_class, str):
1105 control_class = ControlClass[control_class.upper()]
1106 elif isinstance(control_class, int): 1106 ↛ 1108line 1106 didn't jump to line 1108 because the condition on line 1106 was always true
1107 control_class = ControlClass(control_class)
1108 elif not isinstance(control_class, ControlClass):
1109 control_class = ControlClass(control_class.id - 1)
1110 for v in self.values(): 1110 ↛ exitline 1110 didn't return from function 'with_class' because the loop on line 1110 didn't complete
1111 if not isinstance(v, BaseControl): 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true
1112 continue
1113 if v.control_class.id - 1 == control_class: 1113 ↛ 1110line 1113 didn't jump to line 1110 because the condition on line 1113 was always true
1114 yield v
1116 def set_to_default(self):
1117 for v in self.values():
1118 if not isinstance(v, BaseControl):
1119 continue
1121 with contextlib.suppress(AttributeError):
1122 v.set_to_default()
1124 def set_clipping(self, clipping: bool) -> None:
1125 for v in self.values():
1126 if isinstance(v, BaseNumericControl):
1127 v.clipping = clipping
1130class BaseControl:
1131 def __init__(self, device, info, control_class):
1132 self.device = device
1133 self._info = info
1134 self.id = self._info.id
1135 self.name = self._info.name.decode()
1136 self._config_name = None
1137 self.control_class = control_class
1138 self.type = ControlType(self._info.type)
1139 self.flags = ControlFlag(self._info.flags)
1141 try:
1142 self.standard = ControlID(self.id)
1143 except ValueError:
1144 self.standard = None
1146 def __repr__(self):
1147 repr = str(self.config_name)
1149 addrepr = self._get_repr()
1150 addrepr = addrepr.strip()
1151 if addrepr:
1152 repr += f" {addrepr}"
1154 if self.flags:
1155 flags = [flag.name.lower() for flag in ControlFlag if ((self._info.flags & flag) == flag)]
1156 repr += " flags=" + ",".join(flags)
1158 return f"<{type(self).__name__} {repr}>"
1160 def _get_repr(self) -> str:
1161 return ""
1163 def _get_control(self):
1164 # value = get_controls_values(self.device, [self._info])[0]
1165 value = get_controls_values(self.device, (self._info,))[0]
1166 return value
1168 def _set_control(self, value):
1169 if not self.is_writeable:
1170 reasons = []
1171 if self.is_flagged_read_only:
1172 reasons.append("read-only")
1173 if self.is_flagged_inactive: 1173 ↛ 1174line 1173 didn't jump to line 1174 because the condition on line 1173 was never true
1174 reasons.append("inactive")
1175 if self.is_flagged_disabled:
1176 reasons.append("disabled")
1177 if self.is_flagged_grabbed: 1177 ↛ 1178line 1177 didn't jump to line 1178 because the condition on line 1177 was never true
1178 reasons.append("grabbed")
1179 raise AttributeError(f"{self.__class__.__name__} {self.config_name} is not writeable: {', '.join(reasons)}")
1180 set_controls_values(self.device, ((self._info, value),))
1182 @property
1183 def config_name(self) -> str:
1184 if self._config_name is None:
1185 res = self.name.lower()
1186 for r in ("(", ")"):
1187 res = res.replace(r, "")
1188 for r in (", ", " "):
1189 res = res.replace(r, "_")
1190 self._config_name = res
1191 return self._config_name
1193 @property
1194 def is_flagged_disabled(self) -> bool:
1195 return ControlFlag.DISABLED in self.flags
1197 @property
1198 def is_flagged_grabbed(self) -> bool:
1199 return ControlFlag.GRABBED in self.flags
1201 @property
1202 def is_flagged_read_only(self) -> bool:
1203 return ControlFlag.READ_ONLY in self.flags
1205 @property
1206 def is_flagged_update(self) -> bool:
1207 return ControlFlag.UPDATE in self.flags
1209 @property
1210 def is_flagged_inactive(self) -> bool:
1211 return ControlFlag.INACTIVE in self.flags
1213 @property
1214 def is_flagged_slider(self) -> bool:
1215 return ControlFlag.SLIDER in self.flags
1217 @property
1218 def is_flagged_write_only(self) -> bool:
1219 return ControlFlag.WRITE_ONLY in self.flags
1221 @property
1222 def is_flagged_volatile(self) -> bool:
1223 return ControlFlag.VOLATILE in self.flags
1225 @property
1226 def is_flagged_has_payload(self) -> bool:
1227 return ControlFlag.HAS_PAYLOAD in self.flags
1229 @property
1230 def is_flagged_execute_on_write(self) -> bool:
1231 return ControlFlag.EXECUTE_ON_WRITE in self.flags
1233 @property
1234 def is_flagged_modify_layout(self) -> bool:
1235 return ControlFlag.MODIFY_LAYOUT in self.flags
1237 @property
1238 def is_flagged_dynamic_array(self) -> bool:
1239 return ControlFlag.DYNAMIC_ARRAY in self.flags
1241 @property
1242 def is_writeable(self) -> bool:
1243 return not (self.is_flagged_read_only or self.is_flagged_disabled or self.is_flagged_grabbed)
1246class BaseMonoControl(BaseControl):
1247 def _get_repr(self) -> str:
1248 repr = f" default={self.default}"
1249 if not self.is_flagged_write_only: 1249 ↛ 1254line 1249 didn't jump to line 1254 because the condition on line 1249 was always true
1250 try:
1251 repr += f" value={self.value}"
1252 except Exception as error:
1253 repr += f" value=<error: {error!r}>"
1254 return repr
1256 def _convert_read(self, value):
1257 return value
1259 @property
1260 def default(self):
1261 default = get_controls_values(self.device, (self._info,), raw.ControlWhichValue.DEF_VAL)[0]
1262 return self._convert_read(default)
1264 @property
1265 def value(self):
1266 if not self.is_flagged_write_only: 1266 ↛ exitline 1266 didn't return from function 'value' because the condition on line 1266 was always true
1267 v = self._get_control()
1268 return self._convert_read(v)
1270 def _convert_write(self, value):
1271 return value
1273 def _mangle_write(self, value):
1274 return value
1276 @value.setter
1277 def value(self, value):
1278 v = self._convert_write(value)
1279 v = self._mangle_write(v)
1280 self._set_control(v)
1282 def set_to_default(self):
1283 self.value = self.default
1286class GenericControl(BaseMonoControl):
1287 pass
1290class BaseNumericControl(BaseMonoControl):
1291 lower_bound = -(2**31)
1292 upper_bound = 2**31 - 1
1294 def __init__(self, device, info, control_class, clipping=True):
1295 super().__init__(device, info, control_class)
1296 self.minimum = self._info.minimum
1297 self.maximum = self._info.maximum
1298 self.step = self._info.step
1299 self.clipping = clipping
1301 if self.minimum < self.lower_bound: 1301 ↛ 1302line 1301 didn't jump to line 1302 because the condition on line 1301 was never true
1302 raise RuntimeWarning(
1303 f"Control {self.config_name}'s claimed minimum value {self.minimum} exceeds lower bound of {self.__class__.__name__}"
1304 )
1305 if self.maximum > self.upper_bound: 1305 ↛ 1306line 1305 didn't jump to line 1306 because the condition on line 1305 was never true
1306 raise RuntimeWarning(
1307 f"Control {self.config_name}'s claimed maximum value {self.maximum} exceeds upper bound of {self.__class__.__name__}"
1308 )
1310 def _get_repr(self) -> str:
1311 repr = f" min={self.minimum} max={self.maximum} step={self.step}"
1312 repr += super()._get_repr()
1313 return repr
1315 def _convert_read(self, value):
1316 return value
1318 def _convert_write(self, value):
1319 if isinstance(value, int):
1320 return value
1321 else:
1322 try:
1323 v = int(value)
1324 except Exception:
1325 pass
1326 else:
1327 return v
1328 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to int")
1330 def _mangle_write(self, value):
1331 if self.clipping:
1332 if value < self.minimum:
1333 return self.minimum
1334 elif value > self.maximum:
1335 return self.maximum
1336 else:
1337 if value < self.minimum:
1338 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed minimum {self.minimum}")
1339 elif value > self.maximum: 1339 ↛ 1341line 1339 didn't jump to line 1341 because the condition on line 1339 was always true
1340 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed maximum {self.maximum}")
1341 return value
1343 def increase(self, steps: int = 1):
1344 self.value += steps * self.step
1346 def decrease(self, steps: int = 1):
1347 self.value -= steps * self.step
1349 def set_to_minimum(self):
1350 self.value = self.minimum
1352 def set_to_maximum(self):
1353 self.value = self.maximum
1356class IntegerControl(BaseNumericControl):
1357 lower_bound = -(2**31)
1358 upper_bound = 2**31 - 1
1361class Integer64Control(BaseNumericControl):
1362 lower_bound = -(2**63)
1363 upper_bound = 2**63 - 1
1366class U8Control(BaseNumericControl):
1367 lower_bound = 0
1368 upper_bound = 2**8
1371class U16Control(BaseNumericControl):
1372 lower_bound = 0
1373 upper_bound = 2**16
1376class U32Control(BaseNumericControl):
1377 lower_bound = 0
1378 upper_bound = 2**32
1381class BooleanControl(BaseMonoControl):
1382 _true = ["true", "1", "yes", "on", "enable"]
1383 _false = ["false", "0", "no", "off", "disable"]
1385 def _convert_read(self, value):
1386 return bool(value)
1388 def _convert_write(self, value):
1389 if isinstance(value, bool):
1390 return value
1391 elif isinstance(value, str):
1392 if value in self._true:
1393 return True
1394 elif value in self._false: 1394 ↛ 1403line 1394 didn't jump to line 1403 because the condition on line 1394 was always true
1395 return False
1396 else:
1397 try:
1398 v = bool(value)
1399 except Exception:
1400 pass
1401 else:
1402 return v
1403 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to bool")
1406class MenuControl(BaseMonoControl, UserDict):
1407 def __init__(self, device, info, control_class):
1408 BaseControl.__init__(self, device, info, control_class)
1409 UserDict.__init__(self)
1411 if self.type == ControlType.MENU:
1412 self.data = {item.index: item.name.decode() for item in iter_read_menu(self.device, self)}
1413 elif self.type == ControlType.INTEGER_MENU: 1413 ↛ 1416line 1413 didn't jump to line 1416 because the condition on line 1413 was always true
1414 self.data = {item.index: item.value for item in iter_read_menu(self.device, self)}
1415 else:
1416 raise TypeError(f"MenuControl only supports control types MENU or INTEGER_MENU, but not {self.type.name}")
1418 def _convert_write(self, value):
1419 return int(value)
1422class ButtonControl(BaseControl):
1423 def push(self):
1424 self._set_control(1)
1427class CompoundControl(BaseControl):
1428 @property
1429 def default(self):
1430 return get_controls_values(self.device, [self._info], raw.ControlWhichValue.DEF_VAL)[0]
1432 @property
1433 def value(self):
1434 if not self.is_flagged_write_only: 1434 ↛ exitline 1434 didn't return from function 'value' because the condition on line 1434 was always true
1435 return get_controls_values(self.device, [self._info])[0]
1437 @value.setter
1438 def value(self, value):
1439 set_controls_values(self.device, ((self._info, value),))
1442class DeviceHelper:
1443 def __init__(self, device: Device):
1444 super().__init__()
1445 self.device = device
1448class InfoEx(DeviceHelper):
1449 INFO_REPR = """\
1450driver = {info.driver}
1451card = {info.card}
1452bus = {info.bus_info}
1453version = {info.version}
1454capabilities = {capabilities}
1455device_capabilities = {device_capabilities}
1456buffers = {buffers}
1457"""
1459 def __init__(self, device: "Device"):
1460 self.device = device
1461 self._raw_capabilities_cache = None
1463 def __repr__(self):
1464 dcaps = "|".join(cap.name for cap in flag_items(self.device_capabilities))
1465 caps = "|".join(cap.name for cap in flag_items(self.capabilities))
1466 buffers = "|".join(buff.name for buff in self.buffers)
1467 return self.INFO_REPR.format(info=self, capabilities=caps, device_capabilities=dcaps, buffers=buffers)
1469 @property
1470 def raw_capabilities(self) -> raw.v4l2_capability:
1471 if self._raw_capabilities_cache is None:
1472 self._raw_capabilities_cache = read_capabilities(self.device)
1473 return self._raw_capabilities_cache
1475 @property
1476 def driver(self) -> str:
1477 return self.raw_capabilities.driver.decode()
1479 @property
1480 def card(self) -> str:
1481 return self.raw_capabilities.card.decode()
1483 @property
1484 def bus_info(self) -> str:
1485 return self.raw_capabilities.bus_info.decode()
1487 @property
1488 def version_tuple(self) -> tuple:
1489 caps = self.raw_capabilities
1490 return (
1491 (caps.version & 0xFF0000) >> 16,
1492 (caps.version & 0x00FF00) >> 8,
1493 (caps.version & 0x0000FF),
1494 )
1496 @property
1497 def version(self) -> str:
1498 return ".".join(map(str, self.version_tuple))
1500 @property
1501 def capabilities(self) -> Capability:
1502 return Capability(self.raw_capabilities.capabilities)
1504 @property
1505 def device_capabilities(self) -> Capability:
1506 return Capability(self.raw_capabilities.device_caps)
1508 @property
1509 def buffers(self):
1510 dev_caps = self.device_capabilities
1511 return [typ for typ in BufferType if Capability[typ.name] in dev_caps]
1513 def get_crop_capabilities(self, buffer_type: BufferType) -> CropCapability:
1514 return read_crop_capabilities(self.device, buffer_type)
1516 @property
1517 def crop_capabilities(self) -> dict[BufferType, CropCapability]:
1518 buffer_types = CROP_BUFFER_TYPES & set(self.buffers)
1519 result = {}
1520 for buffer_type in buffer_types:
1521 crop_cap = self.get_crop_capabilities(buffer_type)
1522 if crop_cap is None:
1523 continue
1524 result[buffer_type] = crop_cap
1525 return result
1527 @property
1528 def formats(self):
1529 img_fmt_buffer_types = IMAGE_FORMAT_BUFFER_TYPES & set(self.buffers)
1530 return [
1531 image_format
1532 for buffer_type in img_fmt_buffer_types
1533 for image_format in iter_read_formats(self.device, buffer_type)
1534 ]
1536 @property
1537 def frame_sizes(self):
1538 pixel_formats = {fmt.pixel_format for fmt in self.formats}
1539 return list(iter_read_pixel_formats_frame_intervals(self.device, pixel_formats))
1541 @property
1542 def inputs(self) -> list[Input]:
1543 return list(iter_read_inputs(self.device))
1545 @property
1546 def outputs(self):
1547 return list(iter_read_outputs(self.device))
1549 @property
1550 def controls(self):
1551 return list(iter_read_controls(self.device))
1553 @property
1554 def video_standards(self) -> list[Standard]:
1555 """List of video standards for the active input"""
1556 return list(iter_read_video_standards(self.device))
1559class BufferManager(DeviceHelper):
1560 def __init__(self, device: Device, buffer_type: BufferType, size: int = 2):
1561 super().__init__(device)
1562 self.type = buffer_type
1563 self.size = size
1564 self.buffers = None
1565 self.name = type(self).__name__
1567 def formats(self) -> list:
1568 formats = self.device.info.formats
1569 return [fmt for fmt in formats if fmt.type == self.type]
1571 def crop_capabilities(self):
1572 crop_capabilities = self.device.info.crop_capabilities
1573 return [crop for crop in crop_capabilities if crop.type == self.type]
1575 def query_buffer(self, memory, index):
1576 return self.device.query_buffer(self.type, memory, index)
1578 def enqueue_buffer(self, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
1579 return self.device.enqueue_buffer(self.type, memory, size, index)
1581 def dequeue_buffer(self, memory: Memory) -> raw.v4l2_buffer:
1582 return self.device.dequeue_buffer(self.type, memory)
1584 def enqueue_buffers(self, memory: Memory) -> list[raw.v4l2_buffer]:
1585 return self.device.enqueue_buffers(self.type, memory, self.size)
1587 def free_buffers(self, memory: Memory):
1588 result = self.device.free_buffers(self.type, memory)
1589 self.buffers = None
1590 return result
1592 def create_buffers(self, memory: Memory):
1593 if self.buffers: 1593 ↛ 1594line 1593 didn't jump to line 1594 because the condition on line 1593 was never true
1594 raise V4L2Error("buffers already requested. free first")
1595 self.buffers = self.device.create_buffers(self.type, memory, self.size)
1596 return self.buffers
1598 def request_buffers(self, memory: Memory):
1599 if self.buffers:
1600 raise V4L2Error("buffers already requested. free first")
1601 self.buffers = self.device.request_buffers(self.type, memory, self.size)
1602 return self.buffers
1604 def set_format(self, width, height, pixel_format="MJPG"):
1605 return self.device.set_format(self.type, width, height, pixel_format)
1607 def get_format(self) -> Format:
1608 return self.device.get_format(self.type)
1610 def set_fps(self, fps):
1611 return self.device.set_fps(self.type, fps)
1613 def get_fps(self):
1614 return self.device.get_fps(self.type)
1616 def set_selection(self, target, rectangle):
1617 return self.device.set_selection(self.type, target, rectangle)
1619 def get_selection(self, target):
1620 return self.device.get_selection(self.type, target)
1622 def stream_on(self):
1623 self.device.stream_on(self.type)
1625 def stream_off(self):
1626 self.device.stream_off(self.type)
1628 start = stream_on
1629 stop = stream_off
1632class Frame:
1633 """The resulting object from an acquisition."""
1635 __slots__ = ["format", "buff", "data"]
1637 def __init__(self, data: bytes, buff: raw.v4l2_buffer, format: Format):
1638 self.format = format
1639 self.buff = buff
1640 self.data = data
1642 def __bytes__(self):
1643 return self.data
1645 def __len__(self):
1646 return len(self.data)
1648 def __getitem__(self, index):
1649 return self.data[index]
1651 def __repr__(self) -> str:
1652 return (
1653 f"<{type(self).__name__} width={self.width}, height={self.height}, "
1654 f"format={self.pixel_format.name}, frame_nb={self.frame_nb}, timestamp={self.timestamp}>"
1655 )
1657 @property
1658 def width(self):
1659 return self.format.width
1661 @property
1662 def height(self):
1663 return self.format.height
1665 @property
1666 def nbytes(self):
1667 return self.buff.bytesused
1669 @property
1670 def pixel_format(self):
1671 return PixelFormat(self.format.pixel_format)
1673 @property
1674 def index(self):
1675 return self.buff.index
1677 @property
1678 def type(self):
1679 return BufferType(self.buff.type)
1681 @property
1682 def flags(self):
1683 return BufferFlag(self.buff.flags)
1685 @property
1686 def timestamp(self):
1687 return self.buff.timestamp.secs + self.buff.timestamp.usecs * 1e-6
1689 @property
1690 def frame_nb(self):
1691 return self.buff.sequence
1693 @property
1694 def memory(self):
1695 return Memory(self.buff.memory)
1697 @property
1698 def time_type(self):
1699 if BufferFlag.TIMECODE in self.flags:
1700 return TimeCodeType(self.buff.timecode.type)
1702 @property
1703 def time_flags(self):
1704 if BufferFlag.TIMECODE in self.flags:
1705 return TimeCodeFlag(self.buff.timecode.flags)
1707 @property
1708 def time_frame(self):
1709 if BufferFlag.TIMECODE in self.flags:
1710 return self.buff.timecode.frames
1712 @property
1713 def array(self):
1714 import numpy
1716 return numpy.frombuffer(self.data, dtype="u1")
1719class VideoCapture(BufferManager):
1720 def __init__(self, device: Device, size: int = 2, source: Capability = None):
1721 super().__init__(device, BufferType.VIDEO_CAPTURE, size)
1722 self.buffer = None
1723 self.source = source
1725 def __enter__(self):
1726 self.open()
1727 return self
1729 def __exit__(self, *exc):
1730 self.close()
1732 def __iter__(self):
1733 yield from self.buffer
1735 async def __aiter__(self):
1736 async for frame in self.buffer: 1736 ↛ exitline 1736 didn't return from function '__aiter__' because the loop on line 1736 didn't complete
1737 yield frame
1739 def open(self):
1740 if self.buffer is None: 1740 ↛ exitline 1740 didn't return from function 'open' because the condition on line 1740 was always true
1741 self.device.log.info("Preparing for video capture...")
1742 capabilities = self.device.info.device_capabilities
1743 if Capability.VIDEO_CAPTURE not in capabilities:
1744 raise V4L2Error("device lacks VIDEO_CAPTURE capability")
1745 source = capabilities if self.source is None else self.source
1746 if Capability.STREAMING in source: 1746 ↛ 1750line 1746 didn't jump to line 1750 because the condition on line 1746 was always true
1747 self.device.log.info("Video capture using memory map")
1748 self.buffer = MemoryMap(self)
1749 # self.buffer = UserPtr(self)
1750 elif Capability.READWRITE in source:
1751 self.device.log.info("Video capture using read")
1752 self.buffer = ReadSource(self)
1753 else:
1754 raise OSError("Device needs to support STREAMING or READWRITE capability")
1755 self.buffer.open()
1756 self.stream_on()
1757 self.device.log.info("Video capture started!")
1759 def close(self):
1760 if self.buffer: 1760 ↛ exitline 1760 didn't return from function 'close' because the condition on line 1760 was always true
1761 self.device.log.info("Closing video capture...")
1762 self.stream_off()
1763 self.buffer.close()
1764 self.buffer = None
1765 self.device.log.info("Video capture closed")
1768class ReadSource(ReentrantOpen):
1769 def __init__(self, buffer_manager: BufferManager):
1770 super().__init__()
1771 self.buffer_manager = buffer_manager
1772 self.frame_reader = FrameReader(self.device, self.raw_read)
1773 self.format = None
1775 def __iter__(self) -> Iterator[Frame]:
1776 with self.frame_reader:
1777 while True:
1778 yield self.frame_reader.read()
1780 async def __aiter__(self) -> AsyncIterator[Frame]:
1781 async with self.frame_reader:
1782 while True:
1783 yield await self.frame_reader.aread()
1785 def open(self) -> None:
1786 self.format = self.buffer_manager.get_format()
1788 def close(self) -> None:
1789 self.format = None
1791 @property
1792 def device(self) -> Device:
1793 return self.buffer_manager.device
1795 def raw_grab(self) -> tuple[bytes, raw.v4l2_buffer]:
1796 data = os.read(self.device.fileno(), 2**31 - 1)
1797 ns = time.time_ns()
1798 buff = raw.v4l2_buffer()
1799 buff.bytesused = len(data)
1800 buff.timestamp.set_ns(ns)
1801 return data, buff
1803 def raw_read(self) -> Frame:
1804 data, buff = self.raw_grab()
1805 return Frame(data, buff, self.format)
1807 def wait_read(self) -> Frame:
1808 device = self.device
1809 if device.io.select is not None:
1810 device.io.select((device,), (), ())
1811 return self.raw_read()
1813 def read(self) -> Frame:
1814 # first time we check what mode device was opened (blocking vs non-blocking)
1815 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1816 # is available for read. So we need to do it here
1817 if self.device.is_blocking:
1818 self.read = self.raw_read
1819 else:
1820 self.read = self.wait_read
1821 return self.read()
1824class MemorySource(ReentrantOpen):
1825 def __init__(self, buffer_manager: BufferManager, source: Memory):
1826 super().__init__()
1827 self.buffer_manager = buffer_manager
1828 self.source = source
1829 self.buffers = None
1830 self.queue = BufferQueue(buffer_manager, source)
1831 self.frame_reader = FrameReader(self.device, self.raw_read)
1832 self.format = None
1834 def __iter__(self) -> Iterator[Frame]:
1835 with self.frame_reader:
1836 yield from self.frame_reader
1838 def __aiter__(self) -> AsyncIterator[Frame]:
1839 return astream(self.device.fileno(), self.raw_read)
1841 @property
1842 def device(self) -> Device:
1843 return self.buffer_manager.device
1845 def prepare_buffers(self):
1846 raise NotImplementedError
1848 def release_buffers(self):
1849 self.device.log.info("Freeing buffers...")
1850 self.buffer_manager.free_buffers(self.source)
1851 self.buffers = None
1852 self.format = None
1853 self.device.log.info("Buffers freed")
1855 def open(self) -> None:
1856 self.format = self.buffer_manager.get_format()
1857 if self.buffers is None: 1857 ↛ exitline 1857 didn't return from function 'open' because the condition on line 1857 was always true
1858 self.prepare_buffers()
1860 def close(self) -> None:
1861 if self.buffers: 1861 ↛ exitline 1861 didn't return from function 'close' because the condition on line 1861 was always true
1862 self.release_buffers()
1864 def grab_from_buffer(self, buff: raw.v4l2_buffer):
1865 # return memoryview(self.buffers[buff.index])[: buff.bytesused], buff
1866 return self.buffers[buff.index][: buff.bytesused], buff
1868 def raw_grab(self) -> tuple[Buffer, raw.v4l2_buffer]:
1869 with self.queue as buff:
1870 return self.grab_from_buffer(buff)
1872 def raw_read(self) -> Frame:
1873 data, buff = self.raw_grab()
1874 return Frame(data, buff, self.format)
1876 def wait_read(self) -> Frame:
1877 device = self.device
1878 if device.io.select is not None:
1879 device.io.select((device,), (), ())
1880 return self.raw_read()
1882 def read(self) -> Frame:
1883 # first time we check what mode device was opened (blocking vs non-blocking)
1884 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1885 # is available for read. So we need to do it here
1886 if self.device.is_blocking:
1887 self.read = self.raw_read
1888 else:
1889 self.read = self.wait_read
1890 return self.read()
1892 def raw_write(self, data: Buffer) -> raw.v4l2_buffer:
1893 with self.queue as buff:
1894 size = getattr(data, "nbytes", len(data))
1895 memory = self.buffers[buff.index]
1896 memory[:size] = data
1897 buff.bytesused = size
1898 return buff
1900 def wait_write(self, data: Buffer) -> raw.v4l2_buffer:
1901 device = self.device
1902 if device.io.select is not None: 1902 ↛ 1904line 1902 didn't jump to line 1904 because the condition on line 1902 was always true
1903 _, r, _ = device.io.select((), (device,), ())
1904 return self.raw_write(data)
1906 def write(self, data: Buffer) -> raw.v4l2_buffer:
1907 # first time we check what mode device was opened (blocking vs non-blocking)
1908 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1909 # is available for write. So we need to do it here
1910 if self.device.is_blocking: 1910 ↛ 1911line 1910 didn't jump to line 1911 because the condition on line 1910 was never true
1911 self.write = self.raw_write
1912 else:
1913 self.write = self.wait_write
1914 return self.write(data)
1917class UserPtr(MemorySource):
1918 def __init__(self, buffer_manager: BufferManager):
1919 super().__init__(buffer_manager, Memory.USERPTR)
1921 def prepare_buffers(self):
1922 self.device.log.info("Reserving buffers...")
1923 self.buffer_manager.create_buffers(self.source)
1924 size = self.format.size
1925 self.buffers = []
1926 for index in range(self.buffer_manager.size):
1927 data = ctypes.create_string_buffer(size)
1928 self.buffers.append(data)
1929 buff = raw.v4l2_buffer()
1930 buff.index = index
1931 buff.type = self.buffer_manager.type
1932 buff.memory = self.source
1933 buff.m.userptr = ctypes.addressof(data)
1934 buff.length = size
1935 self.queue.enqueue(buff)
1936 self.device.log.info("Buffers reserved")
1939class MemoryMap(MemorySource):
1940 def __init__(self, buffer_manager: BufferManager):
1941 super().__init__(buffer_manager, Memory.MMAP)
1943 def prepare_buffers(self):
1944 self.device.log.info("Reserving buffers...")
1945 buffers = self.buffer_manager.create_buffers(self.source)
1946 fd = self.device.fileno()
1947 self.buffers = [mmap_from_buffer(fd, buff) for buff in buffers]
1948 self.buffer_manager.enqueue_buffers(Memory.MMAP)
1949 self.format = self.buffer_manager.get_format()
1950 self.device.log.info("Buffers reserved")
1953class EventReader:
1954 def __init__(self, device: Device, max_queue_size=100):
1955 self.device = device
1956 self._loop = None
1957 self._selector = None
1958 self._buffer = None
1959 self._max_queue_size = max_queue_size
1961 async def __aenter__(self):
1962 if self.device.is_blocking: 1962 ↛ 1963line 1962 didn't jump to line 1963 because the condition on line 1962 was never true
1963 raise V4L2Error("Cannot use async event reader on blocking device")
1964 self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
1965 self._selector = select.epoll()
1966 self._loop = asyncio.get_event_loop()
1967 self._loop.add_reader(self._selector.fileno(), self._on_event)
1968 self._selector.register(self.device.fileno(), select.EPOLLPRI)
1969 return self
1971 async def __aexit__(self, exc_type, exc_value, traceback):
1972 self._selector.unregister(self.device.fileno())
1973 self._loop.remove_reader(self._selector.fileno())
1974 self._selector.close()
1975 self._selector = None
1976 self._loop = None
1977 self._buffer = None
1979 async def __aiter__(self):
1980 while True:
1981 yield await self.aread()
1983 def __iter__(self):
1984 while True:
1985 yield self.read()
1987 def __enter__(self):
1988 return self
1990 def __exit__(self, exc_type, exc_value, tb):
1991 pass
1993 def _on_event(self):
1994 task = self._loop.create_future()
1995 try:
1996 self._selector.poll(0) # avoid blocking
1997 event = self.device.deque_event()
1998 task.set_result(event)
1999 except Exception as error:
2000 task.set_exception(error)
2002 buffer = self._buffer
2003 if buffer.full(): 2003 ↛ 2004line 2003 didn't jump to line 2004 because the condition on line 2003 was never true
2004 self.device.log.debug("missed event")
2005 buffer.popleft()
2006 buffer.put_nowait(task)
2008 def read(self, timeout=None):
2009 if not self.device.is_blocking: 2009 ↛ 2013line 2009 didn't jump to line 2013 because the condition on line 2009 was always true
2010 _, _, exc = self.device.io.select((), (), (self.device,), timeout)
2011 if not exc: 2011 ↛ 2012line 2011 didn't jump to line 2012 because the condition on line 2011 was never true
2012 return
2013 return self.device.deque_event()
2015 async def aread(self):
2016 """Wait for next event or return last event in queue"""
2017 task = await self._buffer.get()
2018 return await task
2021class FrameReader:
2022 def __init__(self, device: Device, raw_read: Callable[[], Buffer], max_queue_size: int = 1):
2023 self.device = device
2024 self.raw_read = raw_read
2025 self._loop = None
2026 self._selector = None
2027 self._buffer = None
2028 self._max_queue_size = max_queue_size
2029 self._device_fd = None
2031 async def __aenter__(self) -> Self:
2032 if self.device.is_blocking:
2033 raise V4L2Error("Cannot use async frame reader on blocking device")
2034 self._device_fd = self.device.fileno()
2035 self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
2036 self._selector = select.epoll()
2037 self._loop = asyncio.get_event_loop()
2038 self._loop.add_reader(self._selector.fileno(), self._on_event)
2039 self._selector.register(self._device_fd, select.POLLIN)
2040 return self
2042 async def __aexit__(self, exc_type, exc_value, traceback):
2043 with contextlib.suppress(OSError):
2044 # device may have been closed by now
2045 self._selector.unregister(self._device_fd)
2046 self._loop.remove_reader(self._selector.fileno())
2047 self._selector.close()
2048 self._selector = None
2049 self._loop = None
2050 self._buffer = None
2052 def __enter__(self) -> Self:
2053 return self
2055 def __exit__(self, exc_type, exc_value, tb):
2056 pass
2058 def __iter__(self):
2059 while True:
2060 yield self.read()
2062 def _on_event(self) -> None:
2063 task = self._loop.create_future()
2064 try:
2065 self._selector.poll(0) # avoid blocking
2066 data = self.raw_read()
2067 task.set_result(data)
2068 except Exception as error:
2069 task.set_exception(error)
2071 buffer = self._buffer
2072 if buffer.full():
2073 self.device.log.warn("missed frame")
2074 buffer.get_nowait()
2075 buffer.put_nowait(task)
2077 def read(self, timeout: Optional[float] = None) -> Frame:
2078 if not self.device.is_blocking:
2079 read, _, _ = self.device.io.select((self.device,), (), (), timeout)
2080 if not read: 2080 ↛ 2081line 2080 didn't jump to line 2081 because the condition on line 2080 was never true
2081 return
2082 return self.raw_read()
2084 async def aread(self) -> Frame:
2085 """Wait for next frame or return last frame"""
2086 task = await self._buffer.get()
2087 return await task
2090class BufferQueue:
2091 def __init__(self, buffer_manager: BufferManager, memory: Memory):
2092 self.buffer_manager = buffer_manager
2093 self.memory = memory
2094 self.raw_buffer = None
2096 def enqueue(self, buff: raw.v4l2_buffer):
2097 enqueue_buffer_raw(self.buffer_manager.device.fileno(), buff)
2099 def dequeue(self):
2100 return self.buffer_manager.dequeue_buffer(self.memory)
2102 def __enter__(self) -> raw.v4l2_buffer:
2103 # get next buffer that has some data in it
2104 self.raw_buffer = self.dequeue()
2105 return self.raw_buffer
2107 def __exit__(self, *exc):
2108 # Make a copy of buffer. We need the original buffer that was sent to
2109 # dequeue in to keep frame info like frame number, timestamp, etc
2110 raw_buffer = raw.v4l2_buffer()
2111 memcpy(raw_buffer, self.raw_buffer)
2112 self.enqueue(raw_buffer)
2115class Write(ReentrantOpen):
2116 def __init__(self, buffer_manager: BufferManager):
2117 super().__init__()
2118 self.buffer_manager = buffer_manager
2120 @property
2121 def device(self) -> Device:
2122 return self.buffer_manager.device
2124 def raw_write(self, data: Buffer) -> None:
2125 self.device.write(data)
2127 def wait_write(self, data: Buffer) -> None:
2128 device = self.device
2129 if device.io.select is not None:
2130 _, w, _ = device.io.select((), (device,), ())
2131 if not w:
2132 raise OSError("Closed")
2133 self.raw_write(data)
2135 def write(self, data: Buffer) -> None:
2136 # first time we check what mode device was opened (blocking vs non-blocking)
2137 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
2138 # is available for write. So we need to do it here
2139 if self.device.is_blocking:
2140 self.write = self.raw_write
2141 else:
2142 self.write = self.wait_write
2143 return self.write(data)
2145 def open(self) -> None:
2146 pass
2148 def close(self) -> None:
2149 pass
2152class VideoOutput(BufferManager):
2153 def __init__(self, device: Device, size: int = 2, sink: Capability = None):
2154 super().__init__(device, BufferType.VIDEO_OUTPUT, size)
2155 self.buffer = None
2156 self.sink = sink
2158 def __enter__(self) -> Self:
2159 self.open()
2160 return self
2162 def __exit__(self, *exc) -> None:
2163 self.close()
2165 def open(self) -> None:
2166 if self.buffer is not None: 2166 ↛ 2167line 2166 didn't jump to line 2167 because the condition on line 2166 was never true
2167 return
2168 self.device.log.info("Preparing for video output...")
2169 capabilities = self.device.info.capabilities
2170 # Don't check for output capability. Some drivers (ex: v4l2loopback) don't
2171 # report being output capable so that apps like zoom recognize them
2172 # if Capability.VIDEO_OUTPUT not in capabilities:
2173 # raise V4L2Error("device lacks VIDEO_OUTPUT capability")
2174 sink = capabilities if self.sink is None else self.sink
2175 if Capability.STREAMING in sink: 2175 ↛ 2178line 2175 didn't jump to line 2178 because the condition on line 2175 was always true
2176 self.device.log.info("Video output using memory map")
2177 self.buffer = MemoryMap(self)
2178 elif Capability.READWRITE in sink:
2179 self.device.log.info("Video output using write")
2180 self.buffer = Write(self)
2181 else:
2182 raise OSError("Device needs to support STREAMING or READWRITE capability")
2183 self.buffer.open()
2184 self.stream_on()
2185 self.device.log.info("Video output started!")
2187 def close(self) -> None:
2188 if self.buffer: 2188 ↛ exitline 2188 didn't return from function 'close' because the condition on line 2188 was always true
2189 self.device.log.info("Closing video output...")
2190 try:
2191 self.stream_off()
2192 except Exception as error:
2193 self.device.log.warning("Failed to close stream: %r", error)
2194 try:
2195 self.buffer.close()
2196 except Exception as error:
2197 self.device.log.warning("Failed to close buffer: %r", error)
2198 self.buffer = None
2199 self.device.log.info("Video output closed")
2201 def write(self, data: Buffer) -> None:
2202 self.buffer.write(data)
2205def iter_video_files(path: PathLike = "/dev") -> Iterable[Path]:
2206 """Returns an iterator over all video files"""
2207 return iter_device_files(path=path, pattern="video*")
2210def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2211 """Returns an iterator over all video devices"""
2212 return (Device(name, **kwargs) for name in iter_video_files(path=path))
2215def iter_video_capture_files(path: PathLike = "/dev") -> Iterable[Path]:
2216 """Returns an iterator over all video files that have CAPTURE capability"""
2218 def filt(filename):
2219 with IO.open(filename) as fobj:
2220 caps = read_capabilities(fobj.fileno())
2221 return Capability.VIDEO_CAPTURE in Capability(caps.device_caps)
2223 return filter(filt, iter_video_files(path))
2226def iter_video_capture_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2227 """Returns an iterator over all video devices that have CAPTURE capability"""
2228 return (Device(name, **kwargs) for name in iter_video_capture_files(path))
2231def iter_video_output_files(path: PathLike = "/dev") -> Iterable[Path]:
2232 """
2233 Some drivers (ex: v4l2loopback) don't report being output capable so that
2234 apps like zoom recognize them as valid capture devices so some results might
2235 be missing
2236 """
2238 def filt(filename):
2239 with IO.open(filename) as fobj:
2240 caps = read_capabilities(fobj.fileno())
2241 return Capability.VIDEO_OUTPUT in Capability(caps.device_caps)
2243 return filter(filt, iter_video_files(path))
2246def iter_video_output_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2247 """Returns an iterator over all video devices that have VIDEO OUTPUT capability"""
2248 return (Device(name, **kwargs) for name in iter_video_output_files(path))
2251find = make_find(iter_devices)