Coverage for linuxpy/video/device.py: 82%
1564 statements
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""Human friendly interface to V4L2 (Video 4 Linux 2) subsystem."""
9import asyncio
10import collections
11import contextlib
12import copy
13import ctypes
14import enum
15import errno
16import fractions
17import logging
18import mmap
19import os
20import select
21import time
22from collections import UserDict
23from pathlib import Path
25from linuxpy.ctypes import cast, cenum, create_string_buffer, memcpy, string_at
26from linuxpy.device import (
27 BaseDevice,
28 ReentrantOpen,
29 iter_device_files,
30)
31from linuxpy.io import IO
32from linuxpy.ioctl import ioctl
33from linuxpy.types import AsyncIterator, Buffer, Callable, Iterable, Iterator, Optional, PathLike, Self, Union
34from linuxpy.util import astream, bit_indexes, make_find, to_fd
36from . import raw
38log = logging.getLogger(__name__)
39log_mmap = log.getChild("mmap")
42class V4L2Error(Exception):
43 """Video for linux 2 error"""
46def _enum(name, prefix, klass=enum.IntEnum):
47 return klass(
48 name,
49 ((name.replace(prefix, ""), getattr(raw, name)) for name in dir(raw) if name.startswith(prefix)),
50 )
53FrameSizeType = raw.Frmsizetypes
54FrameIntervalType = raw.Frmivaltypes
55Field = raw.Field
56ImageFormatFlag = raw.ImageFormatFlag
57Capability = raw.Capability
58ControlID = raw.ControlID
59ControlFlag = raw.ControlFlag
60ControlType = raw.CtrlType
61ControlClass = raw.ControlClass
62SelectionTarget = raw.SelectionTarget
63EventType = raw.EventType
64EventControlChange = raw.EventControlChange
65IOC = raw.IOC
66BufferType = raw.BufType
67BufferFlag = raw.BufferFlag
68InputType = raw.InputType
69PixelFormat = raw.PixelFormat
70MetaFormat = raw.MetaFormat
71Memory = raw.Memory
72InputStatus = raw.InputStatus
73OutputType = raw.OutputType
74InputCapabilities = raw.InputCapabilities
75OutputCapabilities = raw.OutputCapabilities
76Priority = raw.Priority
77TimeCodeType = raw.TimeCodeType
78TimeCodeFlag = raw.TimeCodeFlag
79EventSubscriptionFlag = raw.EventSubscriptionFlag
80StandardID = raw.StandardID
83def V4L2_CTRL_ID2CLASS(id_):
84 return id_ & 0x0FFF0000 # unsigned long
87def human_pixel_format(ifmt):
88 return "".join(map(chr, ((ifmt >> i) & 0xFF for i in range(0, 4 * 8, 8))))
91PixelFormat.human_str = lambda self: human_pixel_format(self.value)
92MetaFormat.human_str = lambda self: human_pixel_format(self.value)
95ImageFormat = collections.namedtuple("ImageFormat", "type description flags pixel_format")
97MetaFmt = collections.namedtuple("MetaFmt", "format max_buffer_size width height bytes_per_line")
99Format = collections.namedtuple("Format", "width height pixel_format size bytes_per_line")
101CropCapability = collections.namedtuple("CropCapability", "type bounds defrect pixel_aspect")
103Rect = collections.namedtuple("Rect", "left top width height")
105Size = collections.namedtuple("Size", "width height")
107FrameType = collections.namedtuple("FrameType", "type pixel_format width height min_fps max_fps step_fps")
109Input = collections.namedtuple("InputType", "index name type audioset tuner std status capabilities")
111Output = collections.namedtuple("OutputType", "index name type audioset modulator std capabilities")
113Standard = collections.namedtuple("Standard", "index id name frameperiod framelines")
115FrameSize = collections.namedtuple("FrameSize", "index pixel_format type info")
118CROP_BUFFER_TYPES = {
119 BufferType.VIDEO_CAPTURE,
120 BufferType.VIDEO_CAPTURE_MPLANE,
121 BufferType.VIDEO_OUTPUT,
122 BufferType.VIDEO_OUTPUT_MPLANE,
123 BufferType.VIDEO_OVERLAY,
124}
126IMAGE_FORMAT_BUFFER_TYPES = {
127 BufferType.VIDEO_CAPTURE,
128 BufferType.VIDEO_CAPTURE_MPLANE,
129 BufferType.VIDEO_OUTPUT,
130 BufferType.VIDEO_OUTPUT_MPLANE,
131 BufferType.VIDEO_OVERLAY,
132 BufferType.META_CAPTURE,
133 BufferType.META_OUTPUT,
134}
137def mem_map(fd, length, offset):
138 log_mmap.debug("%s, length=%d, offset=%d", fd, length, offset)
139 return mmap.mmap(to_fd(fd), length, offset=offset)
142def flag_items(flag):
143 return [item for item in type(flag) if item in flag]
146def raw_crop_caps_to_crop_caps(crop):
147 return CropCapability(
148 type=BufferType(crop.type),
149 bounds=Rect(
150 crop.bounds.left,
151 crop.bounds.top,
152 crop.bounds.width,
153 crop.bounds.height,
154 ),
155 defrect=Rect(
156 crop.defrect.left,
157 crop.defrect.top,
158 crop.defrect.width,
159 crop.defrect.height,
160 ),
161 pixel_aspect=crop.pixelaspect.numerator / crop.pixelaspect.denominator,
162 )
165CropCapability.from_raw = raw_crop_caps_to_crop_caps
168def raw_read_crop_capabilities(fd, buffer_type: BufferType) -> raw.v4l2_cropcap:
169 crop = raw.v4l2_cropcap()
170 crop.type = buffer_type
171 return ioctl(fd, IOC.CROPCAP, crop)
174def read_crop_capabilities(fd, buffer_type: BufferType) -> CropCapability:
175 try:
176 crop = raw_read_crop_capabilities(fd, buffer_type)
177 except OSError as error:
178 if error.errno == errno.ENODATA:
179 return None
180 raise
181 return raw_crop_caps_to_crop_caps(crop)
184ITER_BREAK = (errno.ENOTTY, errno.ENODATA, errno.EPIPE)
187def iter_read(fd, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval=False):
188 for index in range(start, stop, step):
189 indexed_struct.index = index
190 try:
191 ioctl(fd, ioc, indexed_struct)
192 yield indexed_struct
193 except OSError as error:
194 if error.errno == errno.EINVAL:
195 if ignore_einval:
196 continue
197 else:
198 break
199 elif error.errno in ITER_BREAK:
200 break
201 else:
202 raise
205def iter_read_frame_intervals(fd, fmt, w, h):
206 value = raw.v4l2_frmivalenum()
207 value.pixel_format = fmt
208 value.width = w
209 value.height = h
210 count = 0
211 for val in iter_read(fd, IOC.ENUM_FRAMEINTERVALS, value):
212 # values come in frame interval (fps = 1/interval)
213 try:
214 ftype = FrameIntervalType(val.type)
215 except ValueError:
216 break
217 if ftype == FrameIntervalType.DISCRETE:
218 min_fps = max_fps = step_fps = fractions.Fraction(val.discrete.denominator / val.discrete.numerator)
219 else:
220 if val.stepwise.min.numerator == 0:
221 min_fps = 0
222 else:
223 min_fps = fractions.Fraction(val.stepwise.min.denominator, val.stepwise.min.numerator)
224 if val.stepwise.max.numerator == 0:
225 max_fps = 0
226 else:
227 max_fps = fractions.Fraction(val.stepwise.max.denominator, val.stepwise.max.numerator)
228 if val.stepwise.step.numerator == 0:
229 step_fps = 0
230 else:
231 step_fps = fractions.Fraction(val.stepwise.step.denominator, val.stepwise.step.numerator)
232 yield FrameType(
233 type=ftype,
234 pixel_format=fmt,
235 width=w,
236 height=h,
237 min_fps=min_fps,
238 max_fps=max_fps,
239 step_fps=step_fps,
240 )
241 count += 1
242 if not count:
243 # If it wasn't possible to get frame interval, report discovered frame size anyway
244 yield FrameType(
245 type=FrameIntervalType.DISCRETE,
246 pixel_format=fmt,
247 width=w,
248 height=h,
249 min_fps=0,
250 max_fps=0,
251 step_fps=0,
252 )
255def iter_read_frame_sizes(fd, pixel_format):
256 size = raw.v4l2_frmsizeenum()
257 size.index = 0
258 size.pixel_format = pixel_format
259 for val in iter_read(fd, IOC.ENUM_FRAMESIZES, size):
260 type = FrameSizeType(val.type)
261 if type == FrameSizeType.DISCRETE:
262 info = val.m1.discrete
263 else:
264 info = val.m1.stepwise
265 yield FrameSize(val.index, PixelFormat(val.pixel_format), FrameSizeType(val.type), copy.copy(info))
268def iter_read_pixel_formats_frame_intervals(fd, pixel_formats):
269 for pixel_format in pixel_formats:
270 for size in iter_read_frame_sizes(fd, pixel_format):
271 if size.type == FrameSizeType.DISCRETE:
272 yield from iter_read_frame_intervals(fd, pixel_format, size.info.width, size.info.height)
275def read_capabilities(fd):
276 caps = raw.v4l2_capability()
277 ioctl(fd, IOC.QUERYCAP, caps)
278 return caps
281def iter_read_formats(fd, type):
282 format = raw.v4l2_fmtdesc()
283 format.type = type
284 pixel_formats = set(PixelFormat)
285 meta_formats = set(MetaFormat)
286 for fmt in iter_read(fd, IOC.ENUM_FMT, format):
287 pixel_fmt = fmt.pixelformat
288 if type in {BufferType.VIDEO_CAPTURE, BufferType.VIDEO_OUTPUT}:
289 if pixel_fmt not in pixel_formats:
290 log.warning(
291 "ignored unknown pixel format %s (%d)",
292 human_pixel_format(pixel_fmt),
293 pixel_fmt,
294 )
295 continue
296 pixel_format = PixelFormat(pixel_fmt)
297 elif type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}:
298 if pixel_fmt not in meta_formats:
299 log.warning(
300 "ignored unknown meta format %s (%d)",
301 human_pixel_format(pixel_fmt),
302 pixel_fmt,
303 )
304 continue
305 pixel_format = MetaFormat(pixel_fmt)
306 image_format = ImageFormat(
307 type=type,
308 flags=ImageFormatFlag(fmt.flags),
309 description=fmt.description.decode(),
310 pixel_format=pixel_format,
311 )
312 yield image_format
315def iter_read_inputs(fd):
316 input = raw.v4l2_input()
317 for inp in iter_read(fd, IOC.ENUMINPUT, input):
318 input_type = Input(
319 index=inp.index,
320 name=inp.name.decode(),
321 type=InputType(inp.type),
322 audioset=bit_indexes(inp.audioset),
323 tuner=inp.tuner,
324 std=StandardID(inp.std),
325 status=InputStatus(inp.status),
326 capabilities=InputCapabilities(inp.capabilities),
327 )
328 yield input_type
331def iter_read_outputs(fd):
332 output = raw.v4l2_output()
333 for out in iter_read(fd, IOC.ENUMOUTPUT, output):
334 output_type = Output(
335 index=out.index,
336 name=out.name.decode(),
337 type=OutputType(out.type),
338 audioset=bit_indexes(out.audioset),
339 modulator=out.modulator,
340 std=StandardID(out.std),
341 capabilities=OutputCapabilities(out.capabilities),
342 )
343 yield output_type
346def iter_read_video_standards(fd):
347 std = raw.v4l2_standard()
348 for item in iter_read(fd, IOC.ENUMSTD, std):
349 period = item.frameperiod
350 yield Standard(
351 index=item.index,
352 id=StandardID(item.id),
353 name=item.name.decode(),
354 frameperiod=fractions.Fraction(period.denominator, period.numerator),
355 framelines=item.framelines,
356 )
359def iter_read_controls(fd):
360 ctrl = raw.v4l2_query_ext_ctrl()
361 nxt = ControlFlag.NEXT_CTRL | ControlFlag.NEXT_COMPOUND
362 ctrl.id = nxt
363 for ctrl_ext in iter_read(fd, IOC.QUERY_EXT_CTRL, ctrl):
364 yield copy.deepcopy(ctrl_ext)
365 ctrl_ext.id |= nxt
368def iter_read_menu(fd, ctrl):
369 qmenu = raw.v4l2_querymenu()
370 qmenu.id = ctrl.id
371 for menu in iter_read(
372 fd,
373 IOC.QUERYMENU,
374 qmenu,
375 start=ctrl._info.minimum,
376 stop=ctrl._info.maximum + 1,
377 step=ctrl._info.step,
378 ignore_einval=True,
379 ):
380 yield copy.deepcopy(menu)
383def query_buffer(fd, buffer_type: BufferType, memory: Memory, index: int) -> raw.v4l2_buffer:
384 buff = raw.v4l2_buffer()
385 buff.type = buffer_type
386 buff.memory = memory
387 buff.index = index
388 buff.reserved = 0
389 ioctl(fd, IOC.QUERYBUF, buff)
390 return buff
393def enqueue_buffer_raw(fd, buff: raw.v4l2_buffer) -> raw.v4l2_buffer:
394 if not buff.timestamp.secs:
395 buff.timestamp.set_ns()
396 ioctl(fd, IOC.QBUF, buff)
397 return buff
400def enqueue_buffer(fd, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
401 buff = raw.v4l2_buffer()
402 buff.type = buffer_type
403 buff.memory = memory
404 buff.bytesused = size
405 buff.index = index
406 buff.field = Field.NONE
407 buff.reserved = 0
408 return enqueue_buffer_raw(fd, buff)
411def dequeue_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
412 buff = raw.v4l2_buffer()
413 buff.type = buffer_type
414 buff.memory = memory
415 buff.index = 0
416 buff.reserved = 0
417 ioctl(fd, IOC.DQBUF, buff)
418 return buff
421def request_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> raw.v4l2_requestbuffers:
422 req = raw.v4l2_requestbuffers()
423 req.type = buffer_type
424 req.memory = memory
425 req.count = count
426 ioctl(fd, IOC.REQBUFS, req)
427 if not req.count:
428 raise OSError("Not enough buffer memory")
429 return req
432def free_buffers(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_requestbuffers:
433 req = raw.v4l2_requestbuffers()
434 req.type = buffer_type
435 req.memory = memory
436 req.count = 0
437 ioctl(fd, IOC.REQBUFS, req)
438 return req
441def export_buffer(fd, buffer_type: BufferType, index: int) -> int:
442 req = raw.v4l2_exportbuffer(type=buffer_type, index=index)
443 return ioctl(fd, IOC.EXPBUF, req).fd
446def create_buffers(fd, format: raw.v4l2_format, memory: Memory, count: int) -> raw.v4l2_create_buffers:
447 """Create buffers for Memory Mapped or User Pointer or DMA Buffer I/O"""
448 req = raw.v4l2_create_buffers()
449 req.format = format
450 req.memory = memory
451 req.count = count
452 ioctl(fd, IOC.CREATE_BUFS, req)
453 if not req.count:
454 raise OSError("Not enough buffer memory")
455 return req
458def set_raw_format(fd, fmt: raw.v4l2_format):
459 return ioctl(fd, IOC.S_FMT, fmt)
462def set_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"):
463 fmt = raw.v4l2_format()
464 if isinstance(pixel_format, str):
465 pixel_format = raw.v4l2_fourcc(*pixel_format)
466 fmt.type = buffer_type
467 fmt.fmt.pix.pixelformat = pixel_format
468 fmt.fmt.pix.field = Field.ANY
469 fmt.fmt.pix.width = width
470 fmt.fmt.pix.height = height
471 fmt.fmt.pix.bytesperline = 0
472 fmt.fmt.pix.sizeimage = 0
473 return set_raw_format(fd, fmt)
476def get_raw_format(fd, buffer_type) -> raw.v4l2_format:
477 fmt = raw.v4l2_format()
478 fmt.type = buffer_type
479 ioctl(fd, IOC.G_FMT, fmt)
480 return fmt
483def get_format(fd, buffer_type) -> Union[Format, MetaFmt]:
484 f = get_raw_format(fd, buffer_type)
485 if buffer_type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}:
486 return MetaFmt(
487 format=MetaFormat(f.fmt.meta.dataformat),
488 max_buffer_size=f.fmt.meta.buffersize,
489 width=f.fmt.meta.width,
490 height=f.fmt.meta.height,
491 bytes_per_line=f.fmt.meta.bytesperline,
492 )
493 return Format(
494 width=f.fmt.pix.width,
495 height=f.fmt.pix.height,
496 pixel_format=PixelFormat(f.fmt.pix.pixelformat),
497 size=f.fmt.pix.sizeimage,
498 bytes_per_line=f.fmt.pix.bytesperline,
499 )
502def try_raw_format(fd, fmt: raw.v4l2_format):
503 ioctl(fd, IOC.TRY_FMT, fmt)
506def try_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"):
507 fmt = raw.v4l2_format()
508 if isinstance(pixel_format, str):
509 pixel_format = raw.v4l2_fourcc(*pixel_format)
510 fmt.type = buffer_type
511 fmt.fmt.pix.pixelformat = pixel_format
512 fmt.fmt.pix.field = Field.ANY
513 fmt.fmt.pix.width = width
514 fmt.fmt.pix.height = height
515 fmt.fmt.pix.bytesperline = 0
516 fmt.fmt.pix.sizeimage = 0
517 return try_raw_format(fd, fmt)
520def get_parm(fd, buffer_type):
521 p = raw.v4l2_streamparm()
522 p.type = buffer_type
523 ioctl(fd, IOC.G_PARM, p)
524 return p
527def set_fps(fd, buffer_type, fps):
528 # v4l2 fraction is u32
529 max_denominator = int(min(2**32, 2**32 / fps))
530 p = raw.v4l2_streamparm()
531 p.type = buffer_type
532 fps = fractions.Fraction(fps).limit_denominator(max_denominator)
533 if buffer_type == BufferType.VIDEO_CAPTURE:
534 p.parm.capture.timeperframe.numerator = fps.denominator
535 p.parm.capture.timeperframe.denominator = fps.numerator
536 elif buffer_type == BufferType.VIDEO_OUTPUT:
537 p.parm.output.timeperframe.numerator = fps.denominator
538 p.parm.output.timeperframe.denominator = fps.numerator
539 else:
540 raise ValueError(f"Unsupported buffer type {buffer_type!r}")
541 return ioctl(fd, IOC.S_PARM, p)
544def get_fps(fd, buffer_type):
545 p = get_parm(fd, buffer_type)
546 if buffer_type == BufferType.VIDEO_CAPTURE:
547 parm = p.parm.capture
548 elif buffer_type == BufferType.VIDEO_OUTPUT:
549 parm = p.parm.output
550 else:
551 raise ValueError(f"Unsupported buffer type {buffer_type!r}")
552 return fractions.Fraction(parm.timeperframe.denominator, parm.timeperframe.numerator)
555def stream_on(fd, buffer_type):
556 btype = cenum(buffer_type)
557 return ioctl(fd, IOC.STREAMON, btype)
560def stream_off(fd, buffer_type):
561 btype = cenum(buffer_type)
562 return ioctl(fd, IOC.STREAMOFF, btype)
565def set_selection(fd, buffer_type, target, rectangle):
566 sel = raw.v4l2_selection()
567 sel.type = buffer_type
568 sel.target = target
569 sel.r.left = rectangle.left
570 sel.r.top = rectangle.top
571 sel.r.width = rectangle.width
572 sel.r.height = rectangle.height
573 ioctl(fd, IOC.S_SELECTION, sel)
576def get_selection(
577 fd,
578 buffer_type: BufferType,
579 target: SelectionTarget = SelectionTarget.CROP,
580) -> Rect:
581 sel = raw.v4l2_selection()
582 sel.type = buffer_type
583 sel.target = target
584 ioctl(fd, IOC.G_SELECTION, sel)
585 return Rect(left=sel.r.left, top=sel.r.top, width=sel.r.width, height=sel.r.height)
588def get_control(fd, id):
589 control = raw.v4l2_control(id)
590 ioctl(fd, IOC.G_CTRL, control)
591 return control.value
594CTRL_TYPE_CTYPE_ARRAY = {
595 ControlType.U8: ctypes.c_uint8,
596 ControlType.U16: ctypes.c_uint16,
597 ControlType.U32: ctypes.c_uint32,
598 ControlType.INTEGER: ctypes.c_int,
599 ControlType.INTEGER64: ctypes.c_int64,
600}
603CTRL_TYPE_CTYPE_STRUCT = {
604 # ControlType.AREA: raw.v4l2_area,
605}
608def _struct_for_ctrl_type(ctrl_type):
609 ctrl_type = ControlType(ctrl_type).name.lower()
610 name = f"v4l2_ctrl_{ctrl_type}"
611 try:
612 return getattr(raw, name)
613 except AttributeError:
614 name = f"v4l2_{ctrl_type}"
615 return getattr(raw, name)
618def get_ctrl_type_struct(ctrl_type):
619 struct = CTRL_TYPE_CTYPE_STRUCT.get(ctrl_type)
620 if struct is None:
621 struct = _struct_for_ctrl_type(ctrl_type)
622 CTRL_TYPE_CTYPE_STRUCT[ctrl_type] = struct
623 return struct
626def convert_to_ctypes_array(lst, depth, ctype):
627 """Convert a list (arbitrary depth) to a ctypes array."""
628 if depth == 1:
629 return (ctype * len(lst))(*lst)
631 # Recursive case: we need to process the sub-lists first
632 sub_arrays = [convert_to_ctypes_array(sub_lst, depth - 1, ctype) for sub_lst in lst]
633 array_type = len(sub_arrays) * type(sub_arrays[0]) # Create the array type
634 return array_type(*sub_arrays)
637def _prepare_read_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control):
638 raw_control.id = control.id
639 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags)
640 if has_payload:
641 if control.type == ControlType.STRING:
642 size = control.maximum + 1
643 payload = ctypes.create_string_buffer(size)
644 raw_control.string = payload
645 raw_control.size = size
646 else:
647 ctype = CTRL_TYPE_CTYPE_ARRAY.get(control.type)
648 raw_control.size = control.elem_size * control.elems
649 if ctype is None:
650 ctype = get_ctrl_type_struct(control.type)
651 payload = ctype()
652 raw_control.ptr = ctypes.cast(ctypes.pointer(payload), ctypes.c_void_p)
653 else:
654 for i in range(control.nr_of_dims):
655 ctype *= control.dims[i]
656 payload = ctype()
657 raw_control.size = control.elem_size * control.elems
658 raw_control.ptr = ctypes.cast(payload, ctypes.c_void_p)
659 return payload
662def _get_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control, data):
663 if data is None:
664 if control.type == ControlType.INTEGER64:
665 return raw_control.value64
666 return raw_control.value
667 else:
668 if control.type == ControlType.STRING:
669 return data.value.decode()
670 return data
673def get_controls_values(fd, controls: list[raw.v4l2_query_ext_ctrl], which=raw.ControlWhichValue.CUR_VAL, request_fd=0):
674 n = len(controls)
675 ctrls = raw.v4l2_ext_controls()
676 ctrls.which = which
677 ctrls.count = n
678 ctrls.request_fd = request_fd
679 ctrls.controls = (n * raw.v4l2_ext_control)()
680 values = [_prepare_read_control_value(*args) for args in zip(controls, ctrls.controls, strict=False)]
681 ioctl(fd, IOC.G_EXT_CTRLS, ctrls)
682 return [_get_control_value(*args) for args in zip(controls, ctrls.controls, values, strict=False)]
685def set_control(fd, id, value):
686 control = raw.v4l2_control(id, value)
687 ioctl(fd, IOC.S_CTRL, control)
690def _prepare_write_controls_values(control: raw.v4l2_query_ext_ctrl, value: object, raw_control: raw.v4l2_ext_control):
691 raw_control.id = control.id
692 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags)
693 if has_payload:
694 if control.type == ControlType.STRING:
695 raw_control.string = ctypes.create_string_buffer(value.encode())
696 raw_control.size = len(value) + 1
697 else:
698 array_type = CTRL_TYPE_CTYPE_ARRAY.get(control.type)
699 raw_control.size = control.elem_size * control.elems
700 # a struct: assume value is proper raw struct
701 if array_type is None:
702 value = ctypes.pointer(value)
703 else:
704 value = convert_to_ctypes_array(value, control.nr_of_dims, array_type)
705 ptr = ctypes.cast(value, ctypes.c_void_p)
706 raw_control.ptr = ptr
707 else:
708 if control.type == ControlType.INTEGER64:
709 raw_control.value64 = value
710 else:
711 raw_control.value = value
714def set_controls_values(
715 fd, controls_values: list[tuple[raw.v4l2_query_ext_ctrl, object]], which=raw.ControlWhichValue.CUR_VAL, request_fd=0
716):
717 n = len(controls_values)
718 ctrls = raw.v4l2_ext_controls()
719 ctrls.which = which
720 ctrls.count = n
721 ctrls.request_fd = request_fd
722 ctrls.controls = (n * raw.v4l2_ext_control)()
723 for (control, value), raw_control in zip(controls_values, ctrls.controls, strict=False):
724 _prepare_write_controls_values(control, value, raw_control)
725 ioctl(fd, IOC.S_EXT_CTRLS, ctrls)
728def get_priority(fd) -> Priority:
729 priority = ctypes.c_uint()
730 ioctl(fd, IOC.G_PRIORITY, priority)
731 return Priority(priority.value)
734def set_priority(fd, priority: Priority):
735 priority = ctypes.c_uint(priority.value)
736 ioctl(fd, IOC.S_PRIORITY, priority)
739def subscribe_event(
740 fd,
741 event_type: EventType,
742 id: int = 0,
743 flags: EventSubscriptionFlag = 0,
744):
745 sub = raw.v4l2_event_subscription()
746 sub.type = event_type
747 sub.id = id
748 sub.flags = flags
749 ioctl(fd, IOC.SUBSCRIBE_EVENT, sub)
752def unsubscribe_event(fd, event_type: EventType = EventType.ALL, id: int = 0):
753 sub = raw.v4l2_event_subscription()
754 sub.type = event_type
755 sub.id = id
756 ioctl(fd, IOC.UNSUBSCRIBE_EVENT, sub)
759def deque_event(fd) -> raw.v4l2_event:
760 event = raw.v4l2_event()
761 return ioctl(fd, IOC.DQEVENT, event)
764def set_edid(fd, edid):
765 if len(edid) % 128:
766 raise ValueError(f"EDID length {len(edid)} is not multiple of 128")
767 edid_struct = raw.v4l2_edid()
768 edid_struct.pad = 0
769 edid_struct.start_block = 0
770 edid_struct.blocks = len(edid) // 128
771 edid_array = create_string_buffer(edid)
772 edid_struct.edid = cast(edid_array, type(edid_struct.edid))
773 ioctl(fd, IOC.S_EDID, edid_struct)
776def clear_edid(fd):
777 set_edid(fd, b"")
780def get_edid(fd):
781 edid_struct = raw.v4l2_edid()
782 ioctl(fd, IOC.G_EDID, edid_struct)
783 if edid_struct.blocks == 0:
784 return b""
785 edid_len = 128 * edid_struct.blocks
786 edid_array = create_string_buffer(b"\0" * edid_len)
787 edid_struct.edid = cast(edid_array, type(edid_struct.edid))
788 ioctl(fd, IOC.G_EDID, edid_struct)
789 return string_at(edid_struct.edid, edid_len)
792def get_input(fd):
793 inp = ctypes.c_uint()
794 ioctl(fd, IOC.G_INPUT, inp)
795 return inp.value
798def set_input(fd, index: int):
799 index = ctypes.c_uint(index)
800 ioctl(fd, IOC.S_INPUT, index)
803def get_output(fd):
804 out = ctypes.c_uint()
805 ioctl(fd, IOC.G_OUTPUT, out)
806 return out.value
809def set_output(fd, index: int):
810 index = ctypes.c_uint(index)
811 ioctl(fd, IOC.S_OUTPUT, index)
814def get_std(fd) -> StandardID:
815 out = ctypes.c_uint64()
816 ioctl(fd, IOC.G_STD, out)
817 return StandardID(out.value)
820def set_std(fd, std):
821 ioctl(fd, IOC.S_STD, std)
824def query_std(fd) -> StandardID:
825 out = ctypes.c_uint64()
826 ioctl(fd, IOC.QUERYSTD, out)
827 return StandardID(out.value)
830SubdevFormat = collections.namedtuple(
831 "SubdevFormat", "pad which width height code field colorspace quantization xfer_func flags stream"
832)
835def _translate_subdev_format(fmt: raw.v4l2_subdev_format):
836 return SubdevFormat(
837 pad=fmt.pad,
838 which=raw.SubdevFormatWhence(fmt.which),
839 width=fmt.format.width,
840 height=fmt.format.height,
841 code=raw.MbusPixelcode(fmt.format.code),
842 field=raw.Field(fmt.format.field),
843 colorspace=raw.Colorspace(fmt.format.colorspace),
844 quantization=raw.Quantization(fmt.format.quantization),
845 xfer_func=raw.XferFunc(fmt.format.xfer_func),
846 flags=raw.MbusFrameFormatFlag(fmt.format.flags),
847 stream=fmt.stream,
848 )
851def get_subdevice_format(fd, pad: int = 0) -> raw.v4l2_subdev_format:
852 fmt = raw.v4l2_subdev_format(pad=pad, which=raw.SubdevFormatWhence.ACTIVE)
853 return _translate_subdev_format(ioctl(fd, IOC.SUBDEV_G_FMT, fmt))
856# Helpers
859def request_and_query_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
860 """request + query buffers"""
861 buffers = request_and_query_buffers(fd, buffer_type, memory, 1)
862 return buffers[0]
865def request_and_query_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
866 """request + query buffers"""
867 request_buffers(fd, buffer_type, memory, count)
868 return [query_buffer(fd, buffer_type, memory, index) for index in range(count)]
871def mmap_from_buffer(fd, buff: raw.v4l2_buffer) -> mmap.mmap:
872 return mem_map(fd, buff.length, offset=buff.m.offset)
875def create_mmap_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[mmap.mmap]:
876 """create buffers + mmap_from_buffer"""
877 return [mmap_from_buffer(fd, buff) for buff in request_and_query_buffers(fd, buffer_type, memory, count)]
880def create_mmap_buffer(fd, buffer_type: BufferType, memory: Memory) -> mmap.mmap:
881 return create_mmap_buffers(fd, buffer_type, memory, 1)
884def enqueue_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
885 return [enqueue_buffer(fd, buffer_type, memory, 0, index) for index in range(count)]
888class Device(BaseDevice):
889 PREFIX = "/dev/video"
891 def __init__(self, name_or_file, read_write=True, io=IO, blocking=False):
892 self.info = None
893 self.controls = None
894 super().__init__(name_or_file, read_write=read_write, io=io, blocking=blocking)
896 def __iter__(self):
897 with VideoCapture(self) as stream:
898 yield from stream
900 async def __aiter__(self):
901 with VideoCapture(self) as stream:
902 async for frame in stream:
903 yield frame
905 def _on_open(self):
906 self.info = InfoEx(self)
907 self.controls = Controls.from_device(self)
909 def query_buffer(self, buffer_type, memory, index):
910 return query_buffer(self, buffer_type, memory, index)
912 def enqueue_buffer(self, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
913 return enqueue_buffer(self, buffer_type, memory, size, index)
915 def dequeue_buffer(self, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
916 return dequeue_buffer(self, buffer_type, memory)
918 def request_buffers(self, buffer_type, memory, size):
919 return request_buffers(self, buffer_type, memory, size)
921 def create_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
922 return request_and_query_buffers(self, buffer_type, memory, count)
924 def free_buffers(self, buffer_type, memory):
925 return free_buffers(self, buffer_type, memory)
927 def enqueue_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
928 return enqueue_buffers(self, buffer_type, memory, count)
930 def set_format(
931 self,
932 buffer_type: BufferType,
933 width: int,
934 height: int,
935 pixel_format: str = "MJPG",
936 ):
937 return set_format(self, buffer_type, width, height, pixel_format=pixel_format)
939 def get_format(self, buffer_type) -> Format:
940 return get_format(self, buffer_type)
942 def set_fps(self, buffer_type, fps):
943 return set_fps(self, buffer_type, fps)
945 def get_fps(self, buffer_type):
946 return get_fps(self, buffer_type)
948 def set_selection(self, buffer_type, target, rectangle):
949 return set_selection(self, buffer_type, target, rectangle)
951 def get_selection(self, buffer_type, target):
952 return get_selection(self, buffer_type, target)
954 def get_priority(self) -> Priority:
955 return get_priority(self)
957 def set_priority(self, priority: Priority):
958 set_priority(self, priority)
960 def stream_on(self, buffer_type):
961 stream_on(self, buffer_type)
963 def stream_off(self, buffer_type):
964 stream_off(self, buffer_type)
966 def write(self, data: bytes) -> None:
967 self._fobj.write(data)
969 def subscribe_event(
970 self,
971 event_type: EventType = EventType.ALL,
972 id: int = 0,
973 flags: EventSubscriptionFlag = 0,
974 ):
975 return subscribe_event(self, event_type, id, flags)
977 def unsubscribe_event(self, event_type: EventType = EventType.ALL, id: int = 0):
978 return unsubscribe_event(self, event_type, id)
980 def deque_event(self):
981 return deque_event(self)
983 def set_edid(self, edid):
984 set_edid(self, edid)
986 def clear_edid(self):
987 clear_edid(self)
989 def get_edid(self):
990 return get_edid(self)
992 def get_input(self):
993 return get_input(self)
995 def set_input(self, index: int):
996 return set_input(self, index)
998 def get_output(self):
999 return get_output(self)
1001 def set_output(self, index: int):
1002 return set_output(self, index)
1004 def get_std(self) -> StandardID:
1005 return get_std(self)
1007 def set_std(self, std):
1008 return set_std(self, std)
1010 def query_std(self) -> StandardID:
1011 return query_std(self)
1013 def clone(self) -> Self:
1014 fd = os.dup(self.fileno())
1015 fobj = self.io.os.fdopen(fd, "rb+", buffering=0)
1016 fobj.name = self._fobj.name
1017 return type(self)(fobj)
1020class SubDevice(BaseDevice):
1021 def get_format(self, pad: int = 0) -> SubdevFormat:
1022 return get_subdevice_format(self, pad=pad)
1025def create_artificial_control_class(class_id):
1026 return raw.v4l2_query_ext_ctrl(
1027 id=class_id | 1,
1028 name=b"Generic Controls",
1029 type=ControlType.CTRL_CLASS,
1030 )
1033class Controls(dict):
1034 def __init__(self, device: Device):
1035 super().__init__()
1036 self.__dict__["_device"] = device
1037 self.__dict__["_initialized"] = False
1039 def _init_if_needed(self):
1040 if not self._initialized:
1041 self.refresh()
1042 self.__dict__["_initialized"] = True
1044 def __repr__(self):
1045 self._init_if_needed()
1046 return super().__repr__()
1048 def __getitem__(self, name):
1049 self._init_if_needed()
1050 return super().__getitem__(name)
1052 def __len__(self):
1053 self._init_if_needed()
1054 return super().__len__()
1056 def refresh(self):
1057 ctrl_type_map = {
1058 ControlType.BOOLEAN: BooleanControl,
1059 ControlType.INTEGER: IntegerControl,
1060 ControlType.INTEGER64: Integer64Control,
1061 ControlType.MENU: MenuControl,
1062 ControlType.INTEGER_MENU: MenuControl,
1063 ControlType.U8: U8Control,
1064 ControlType.U16: U16Control,
1065 ControlType.U32: U32Control,
1066 ControlType.BUTTON: ButtonControl,
1067 }
1068 classes = {}
1069 for ctrl in self._device.info.controls:
1070 ctrl_type = ControlType(ctrl.type)
1071 ctrl_class_id = V4L2_CTRL_ID2CLASS(ctrl.id)
1072 if ctrl_type == ControlType.CTRL_CLASS:
1073 classes[ctrl_class_id] = ctrl
1074 else:
1075 klass = classes.get(ctrl_class_id)
1076 if klass is None:
1077 klass = create_artificial_control_class(ctrl_class_id)
1078 classes[ctrl_class_id] = klass
1079 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(ctrl.flags)
1080 if has_payload:
1081 ctrl_class = CompoundControl
1082 else:
1083 ctrl_class = ctrl_type_map.get(ctrl_type, GenericControl)
1084 self[ctrl.id] = ctrl_class(self._device, ctrl, klass)
1086 @classmethod
1087 def from_device(cls, device):
1088 """Deprecated: backward compatible. Please use Controls(device) constructor directly"""
1089 return cls(device)
1091 def __getattr__(self, key):
1092 with contextlib.suppress(KeyError):
1093 return self[key]
1094 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'")
1096 def __setattr__(self, key, value):
1097 self._init_if_needed()
1098 self[key] = value
1100 def __missing__(self, key):
1101 self._init_if_needed()
1102 for v in self.values():
1103 if isinstance(v, BaseControl) and (v.config_name == key):
1104 return v
1105 raise KeyError(key)
1107 def items(self):
1108 self._init_if_needed()
1109 return super().items()
1111 def values(self):
1112 self._init_if_needed()
1113 return super().values()
1115 def used_classes(self):
1116 class_map = {v.control_class.id: v.control_class for v in self.values() if isinstance(v, BaseControl)}
1117 return list(class_map.values())
1119 def with_class(self, control_class):
1120 if isinstance(control_class, str):
1121 control_class = ControlClass[control_class.upper()]
1122 elif isinstance(control_class, int):
1123 control_class = ControlClass(control_class)
1124 elif not isinstance(control_class, ControlClass):
1125 control_class = ControlClass(control_class.id - 1)
1126 for v in self.values():
1127 if not isinstance(v, BaseControl):
1128 continue
1129 if v.control_class.id - 1 == control_class:
1130 yield v
1132 def set_to_default(self):
1133 for v in self.values():
1134 if not isinstance(v, BaseControl):
1135 continue
1137 with contextlib.suppress(AttributeError):
1138 v.set_to_default()
1140 def set_clipping(self, clipping: bool) -> None:
1141 for v in self.values():
1142 if isinstance(v, BaseNumericControl):
1143 v.clipping = clipping
1146class BaseControl:
1147 def __init__(self, device, info, control_class, klass=None):
1148 self.device = device
1149 self._info = info
1150 self.id = self._info.id
1151 self.name = self._info.name.decode()
1152 self._config_name = None
1153 self.control_class = control_class
1154 self.type = ControlType(self._info.type)
1155 self.flags = ControlFlag(self._info.flags)
1156 self.minimum = self._info.minimum
1157 self.maximum = self._info.maximum
1158 self.step = self._info.step
1160 try:
1161 self.standard = ControlID(self.id)
1162 except ValueError:
1163 self.standard = None
1165 def __repr__(self):
1166 repr = str(self.config_name)
1168 addrepr = self._get_repr()
1169 addrepr = addrepr.strip()
1170 if addrepr:
1171 repr += f" {addrepr}"
1173 if self.flags:
1174 flags = [flag.name.lower() for flag in ControlFlag if ((self._info.flags & flag) == flag)]
1175 repr += " flags=" + ",".join(flags)
1177 return f"<{type(self).__name__} {repr}>"
1179 def _get_repr(self) -> str:
1180 return ""
1182 def _get_control(self):
1183 # value = get_controls_values(self.device, [self._info])[0]
1184 value = get_controls_values(self.device, (self._info,))[0]
1185 return value
1187 def _set_control(self, value):
1188 if not self.is_writeable:
1189 reasons = []
1190 if self.is_flagged_read_only:
1191 reasons.append("read-only")
1192 if self.is_flagged_inactive:
1193 reasons.append("inactive")
1194 if self.is_flagged_disabled:
1195 reasons.append("disabled")
1196 if self.is_flagged_grabbed:
1197 reasons.append("grabbed")
1198 raise AttributeError(f"{self.__class__.__name__} {self.config_name} is not writeable: {', '.join(reasons)}")
1199 set_controls_values(self.device, ((self._info, value),))
1201 @property
1202 def config_name(self) -> str:
1203 if self._config_name is None:
1204 res = self.name.lower()
1205 for r in ("(", ")"):
1206 res = res.replace(r, "")
1207 for r in (", ", " "):
1208 res = res.replace(r, "_")
1209 self._config_name = res
1210 return self._config_name
1212 @property
1213 def is_flagged_disabled(self) -> bool:
1214 return ControlFlag.DISABLED in self.flags
1216 @property
1217 def is_flagged_grabbed(self) -> bool:
1218 return ControlFlag.GRABBED in self.flags
1220 @property
1221 def is_flagged_read_only(self) -> bool:
1222 return ControlFlag.READ_ONLY in self.flags
1224 @property
1225 def is_flagged_update(self) -> bool:
1226 return ControlFlag.UPDATE in self.flags
1228 @property
1229 def is_flagged_inactive(self) -> bool:
1230 return ControlFlag.INACTIVE in self.flags
1232 @property
1233 def is_flagged_slider(self) -> bool:
1234 return ControlFlag.SLIDER in self.flags
1236 @property
1237 def is_flagged_write_only(self) -> bool:
1238 return ControlFlag.WRITE_ONLY in self.flags
1240 @property
1241 def is_flagged_volatile(self) -> bool:
1242 return ControlFlag.VOLATILE in self.flags
1244 @property
1245 def is_flagged_has_payload(self) -> bool:
1246 return ControlFlag.HAS_PAYLOAD in self.flags
1248 @property
1249 def is_flagged_execute_on_write(self) -> bool:
1250 return ControlFlag.EXECUTE_ON_WRITE in self.flags
1252 @property
1253 def is_flagged_modify_layout(self) -> bool:
1254 return ControlFlag.MODIFY_LAYOUT in self.flags
1256 @property
1257 def is_flagged_dynamic_array(self) -> bool:
1258 return ControlFlag.DYNAMIC_ARRAY in self.flags
1260 @property
1261 def is_writeable(self) -> bool:
1262 return not (self.is_flagged_read_only or self.is_flagged_disabled or self.is_flagged_grabbed)
1265class BaseMonoControl(BaseControl):
1266 def _get_repr(self) -> str:
1267 repr = f" default={self.default}"
1268 if not self.is_flagged_write_only:
1269 try:
1270 repr += f" value={self.value}"
1271 except Exception as error:
1272 repr += f" value=<error: {error!r}>"
1273 return repr
1275 def _convert_read(self, value):
1276 return value
1278 @property
1279 def default(self):
1280 default = get_controls_values(self.device, (self._info,), raw.ControlWhichValue.DEF_VAL)[0]
1281 return self._convert_read(default)
1283 @property
1284 def value(self):
1285 if not self.is_flagged_write_only:
1286 v = self._get_control()
1287 return self._convert_read(v)
1289 def _convert_write(self, value):
1290 return value
1292 def _mangle_write(self, value):
1293 return value
1295 @value.setter
1296 def value(self, value):
1297 v = self._convert_write(value)
1298 v = self._mangle_write(v)
1299 self._set_control(v)
1301 def set_to_default(self):
1302 self.value = self.default
1305class GenericControl(BaseMonoControl):
1306 pass
1309class BaseNumericControl(BaseMonoControl):
1310 lower_bound = -(2**31)
1311 upper_bound = 2**31 - 1
1313 def __init__(self, device, info, control_class, clipping=True):
1314 super().__init__(device, info, control_class)
1315 self.clipping = clipping
1317 if self.minimum < self.lower_bound:
1318 raise RuntimeWarning(
1319 f"Control {self.config_name}'s claimed minimum value {self.minimum} exceeds lower bound of {self.__class__.__name__}"
1320 )
1321 if self.maximum > self.upper_bound:
1322 raise RuntimeWarning(
1323 f"Control {self.config_name}'s claimed maximum value {self.maximum} exceeds upper bound of {self.__class__.__name__}"
1324 )
1326 def _get_repr(self) -> str:
1327 repr = f" min={self.minimum} max={self.maximum} step={self.step}"
1328 repr += super()._get_repr()
1329 return repr
1331 def _convert_read(self, value):
1332 return value
1334 def _convert_write(self, value):
1335 if isinstance(value, int):
1336 return value
1337 else:
1338 try:
1339 v = int(value)
1340 except Exception:
1341 pass
1342 else:
1343 return v
1344 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to int")
1346 def _mangle_write(self, value):
1347 if self.clipping:
1348 if value < self.minimum:
1349 return self.minimum
1350 elif value > self.maximum:
1351 return self.maximum
1352 else:
1353 if value < self.minimum:
1354 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed minimum {self.minimum}")
1355 elif value > self.maximum:
1356 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed maximum {self.maximum}")
1357 return value
1359 def increase(self, steps: int = 1):
1360 self.value += steps * self.step
1362 def decrease(self, steps: int = 1):
1363 self.value -= steps * self.step
1365 def set_to_minimum(self):
1366 self.value = self.minimum
1368 def set_to_maximum(self):
1369 self.value = self.maximum
1372class IntegerControl(BaseNumericControl):
1373 lower_bound = -(2**31)
1374 upper_bound = 2**31 - 1
1377class Integer64Control(BaseNumericControl):
1378 lower_bound = -(2**63)
1379 upper_bound = 2**63 - 1
1382class U8Control(BaseNumericControl):
1383 lower_bound = 0
1384 upper_bound = 2**8
1387class U16Control(BaseNumericControl):
1388 lower_bound = 0
1389 upper_bound = 2**16
1392class U32Control(BaseNumericControl):
1393 lower_bound = 0
1394 upper_bound = 2**32
1397class BooleanControl(BaseMonoControl):
1398 _true = ["true", "1", "yes", "on", "enable"]
1399 _false = ["false", "0", "no", "off", "disable"]
1401 def _convert_read(self, value):
1402 return bool(value)
1404 def _convert_write(self, value):
1405 if isinstance(value, bool):
1406 return value
1407 elif isinstance(value, str):
1408 if value in self._true:
1409 return True
1410 elif value in self._false:
1411 return False
1412 else:
1413 try:
1414 v = bool(value)
1415 except Exception:
1416 pass
1417 else:
1418 return v
1419 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to bool")
1422class MenuControl(BaseMonoControl, UserDict):
1423 def __init__(self, device, info, control_class):
1424 BaseControl.__init__(self, device, info, control_class)
1425 UserDict.__init__(self)
1427 if self.type == ControlType.MENU:
1428 self.data = {item.index: item.name.decode() for item in iter_read_menu(self.device, self)}
1429 elif self.type == ControlType.INTEGER_MENU:
1430 self.data = {item.index: item.value for item in iter_read_menu(self.device, self)}
1431 else:
1432 raise TypeError(f"MenuControl only supports control types MENU or INTEGER_MENU, but not {self.type.name}")
1434 def _convert_write(self, value):
1435 return int(value)
1438class ButtonControl(BaseControl):
1439 def push(self):
1440 self._set_control(1)
1443class CompoundControl(BaseControl):
1444 @property
1445 def default(self):
1446 return get_controls_values(self.device, [self._info], raw.ControlWhichValue.DEF_VAL)[0]
1448 @property
1449 def value(self):
1450 if not self.is_flagged_write_only:
1451 return get_controls_values(self.device, [self._info])[0]
1453 @value.setter
1454 def value(self, value):
1455 set_controls_values(self.device, ((self._info, value),))
1457 def set_to_default(self):
1458 self.value = self.default
1461class DeviceHelper:
1462 def __init__(self, device: Device):
1463 super().__init__()
1464 self.device = device
1467class InfoEx(DeviceHelper):
1468 INFO_REPR = """\
1469driver = {info.driver}
1470card = {info.card}
1471bus = {info.bus_info}
1472version = {info.version}
1473capabilities = {capabilities}
1474device_capabilities = {device_capabilities}
1475buffers = {buffers}
1476"""
1478 def __init__(self, device: "Device"):
1479 self.device = device
1480 self._raw_capabilities_cache = None
1482 def __repr__(self):
1483 dcaps = "|".join(cap.name for cap in flag_items(self.device_capabilities))
1484 caps = "|".join(cap.name for cap in flag_items(self.capabilities))
1485 buffers = "|".join(buff.name for buff in self.buffers)
1486 return self.INFO_REPR.format(info=self, capabilities=caps, device_capabilities=dcaps, buffers=buffers)
1488 @property
1489 def raw_capabilities(self) -> raw.v4l2_capability:
1490 if self._raw_capabilities_cache is None:
1491 self._raw_capabilities_cache = read_capabilities(self.device)
1492 return self._raw_capabilities_cache
1494 @property
1495 def driver(self) -> str:
1496 return self.raw_capabilities.driver.decode()
1498 @property
1499 def card(self) -> str:
1500 return self.raw_capabilities.card.decode()
1502 @property
1503 def bus_info(self) -> str:
1504 return self.raw_capabilities.bus_info.decode()
1506 @property
1507 def version_tuple(self) -> tuple:
1508 caps = self.raw_capabilities
1509 return (
1510 (caps.version & 0xFF0000) >> 16,
1511 (caps.version & 0x00FF00) >> 8,
1512 (caps.version & 0x0000FF),
1513 )
1515 @property
1516 def version(self) -> str:
1517 return ".".join(map(str, self.version_tuple))
1519 @property
1520 def capabilities(self) -> Capability:
1521 return Capability(self.raw_capabilities.capabilities)
1523 @property
1524 def device_capabilities(self) -> Capability:
1525 return Capability(self.raw_capabilities.device_caps)
1527 @property
1528 def buffers(self):
1529 dev_caps = self.device_capabilities
1530 return [typ for typ in BufferType if Capability[typ.name] in dev_caps]
1532 def get_crop_capabilities(self, buffer_type: BufferType) -> CropCapability:
1533 return read_crop_capabilities(self.device, buffer_type)
1535 @property
1536 def crop_capabilities(self) -> dict[BufferType, CropCapability]:
1537 buffer_types = CROP_BUFFER_TYPES & set(self.buffers)
1538 result = {}
1539 for buffer_type in buffer_types:
1540 crop_cap = self.get_crop_capabilities(buffer_type)
1541 if crop_cap is None:
1542 continue
1543 result[buffer_type] = crop_cap
1544 return result
1546 @property
1547 def formats(self):
1548 img_fmt_buffer_types = IMAGE_FORMAT_BUFFER_TYPES & set(self.buffers)
1549 return [
1550 image_format
1551 for buffer_type in img_fmt_buffer_types
1552 for image_format in iter_read_formats(self.device, buffer_type)
1553 ]
1555 def buffer_formats(self, buffer_type) -> list[ImageFormat]:
1556 return list(iter_read_formats(self.device, buffer_type))
1558 def format_frame_sizes(self, pixel_format) -> list[FrameSize]:
1559 return list(iter_read_frame_sizes(self.device, pixel_format))
1561 def frame_sizes(self):
1562 results = []
1563 for fmt in self.formats:
1564 results.extend(self.format_frame_sizes(fmt.pixel_format))
1565 return results
1567 def fps_intervals(self, pixel_format, width, height) -> list[FrameType]:
1568 return list(iter_read_frame_intervals(self.device, pixel_format, width, height))
1570 @property
1571 def frame_types(self):
1572 pixel_formats = {fmt.pixel_format for fmt in self.formats}
1573 return list(iter_read_pixel_formats_frame_intervals(self.device, pixel_formats))
1575 @property
1576 def inputs(self) -> list[Input]:
1577 return list(iter_read_inputs(self.device))
1579 @property
1580 def outputs(self):
1581 return list(iter_read_outputs(self.device))
1583 @property
1584 def controls(self):
1585 return list(iter_read_controls(self.device))
1587 @property
1588 def video_standards(self) -> list[Standard]:
1589 """List of video standards for the active input"""
1590 return list(iter_read_video_standards(self.device))
1593class BufferManager(DeviceHelper):
1594 def __init__(self, device: Device, buffer_type: BufferType, size: int = 2):
1595 super().__init__(device)
1596 self.type = buffer_type
1597 self.size = size
1598 self.buffers = None
1599 self.name = type(self).__name__
1601 def formats(self) -> list:
1602 formats = self.device.info.formats
1603 return [fmt for fmt in formats if fmt.type == self.type]
1605 def crop_capabilities(self):
1606 crop_capabilities = self.device.info.crop_capabilities
1607 return [crop for crop in crop_capabilities if crop.type == self.type]
1609 def query_buffer(self, memory, index):
1610 return self.device.query_buffer(self.type, memory, index)
1612 def enqueue_buffer(self, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
1613 return self.device.enqueue_buffer(self.type, memory, size, index)
1615 def dequeue_buffer(self, memory: Memory) -> raw.v4l2_buffer:
1616 return self.device.dequeue_buffer(self.type, memory)
1618 def enqueue_buffers(self, memory: Memory) -> list[raw.v4l2_buffer]:
1619 return self.device.enqueue_buffers(self.type, memory, self.size)
1621 def free_buffers(self, memory: Memory):
1622 result = self.device.free_buffers(self.type, memory)
1623 self.buffers = None
1624 return result
1626 def create_buffers(self, memory: Memory):
1627 if self.buffers:
1628 raise V4L2Error("buffers already requested. free first")
1629 self.buffers = self.device.create_buffers(self.type, memory, self.size)
1630 return self.buffers
1632 def request_buffers(self, memory: Memory):
1633 if self.buffers:
1634 raise V4L2Error("buffers already requested. free first")
1635 self.buffers = self.device.request_buffers(self.type, memory, self.size)
1636 return self.buffers
1638 def set_format(self, width, height, pixel_format="MJPG"):
1639 return self.device.set_format(self.type, width, height, pixel_format)
1641 def get_format(self) -> Format:
1642 return self.device.get_format(self.type)
1644 def set_fps(self, fps):
1645 return self.device.set_fps(self.type, fps)
1647 def get_fps(self):
1648 return self.device.get_fps(self.type)
1650 def set_selection(self, target, rectangle):
1651 return self.device.set_selection(self.type, target, rectangle)
1653 def get_selection(self, target):
1654 return self.device.get_selection(self.type, target)
1656 def stream_on(self):
1657 self.device.stream_on(self.type)
1659 def stream_off(self):
1660 self.device.stream_off(self.type)
1662 start = stream_on
1663 stop = stream_off
1666class Frame:
1667 """The resulting object from an acquisition."""
1669 __slots__ = ["format", "buff", "data", "user_data"]
1671 def __init__(self, data: bytes, buff: raw.v4l2_buffer, format: Format):
1672 self.format = format
1673 self.buff = buff
1674 self.data = data
1675 self.user_data = None
1677 def __bytes__(self):
1678 return self.data
1680 def __len__(self):
1681 return len(self.data)
1683 def __getitem__(self, index):
1684 return self.data[index]
1686 def __repr__(self) -> str:
1687 return (
1688 f"<{type(self).__name__} width={self.width}, height={self.height}, "
1689 f"format={self.pixel_format.name}, frame_nb={self.frame_nb}, timestamp={self.timestamp}>"
1690 )
1692 def __format__(self, spec):
1693 if spec in {"", "s"}:
1694 return str(self)
1695 elif spec == "r":
1696 return repr(self)
1697 elif spec == "f":
1698 return f"{self.width}x{self.height} {self.pixel_format.name}"
1699 elif spec == "l":
1700 return f"#{self.frame_nb} {self.timestamp} {self:f}"
1701 return str(getattr(self, spec))
1703 @property
1704 def width(self):
1705 return self.format.width
1707 @property
1708 def height(self):
1709 return self.format.height
1711 @property
1712 def nbytes(self):
1713 return self.buff.bytesused
1715 @property
1716 def pixel_format(self):
1717 return PixelFormat(self.format.pixel_format)
1719 @property
1720 def index(self):
1721 return self.buff.index
1723 @property
1724 def type(self):
1725 return BufferType(self.buff.type)
1727 @property
1728 def flags(self):
1729 return BufferFlag(self.buff.flags)
1731 @property
1732 def timestamp(self):
1733 return self.buff.timestamp.secs + self.buff.timestamp.usecs * 1e-6
1735 @property
1736 def frame_nb(self):
1737 return self.buff.sequence
1739 @property
1740 def memory(self):
1741 return Memory(self.buff.memory)
1743 @property
1744 def time_type(self):
1745 if BufferFlag.TIMECODE in self.flags:
1746 return TimeCodeType(self.buff.timecode.type)
1748 @property
1749 def time_flags(self):
1750 if BufferFlag.TIMECODE in self.flags:
1751 return TimeCodeFlag(self.buff.timecode.flags)
1753 @property
1754 def time_frame(self):
1755 if BufferFlag.TIMECODE in self.flags:
1756 return self.buff.timecode.frames
1758 @property
1759 def array(self):
1760 import numpy
1762 return numpy.frombuffer(self.data, dtype="u1")
1765class VideoCapture(BufferManager):
1766 def __init__(self, device: Device, size: int = 2, buffer_type=None):
1767 super().__init__(device, BufferType.VIDEO_CAPTURE, size)
1768 self.buffer = None
1769 self._buffer_type = buffer_type
1771 def __enter__(self):
1772 self.open()
1773 return self
1775 def __exit__(self, *exc):
1776 self.close()
1778 def __iter__(self):
1779 yield from self.buffer
1781 async def __aiter__(self):
1782 async for frame in self.buffer:
1783 yield frame
1785 @property
1786 def buffer_type(self):
1787 capabilities = self.device.info.device_capabilities
1788 if Capability.VIDEO_CAPTURE not in capabilities:
1789 raise V4L2Error("Device doesn't have VIDEO_CAPTURE capability")
1790 if self._buffer_type is None:
1791 if Capability.STREAMING in capabilities:
1792 return MemoryMap
1793 elif Capability.READWRITE in capabilities:
1794 return ReadSource
1795 return None
1796 return self._buffer_type
1798 def create_buffer(self):
1799 buffer_type = self.buffer_type
1800 if buffer_type is None:
1801 raise OSError("Device needs to support STREAMING or READWRITE capability")
1802 return buffer_type(self)
1804 def arm(self):
1805 self.device.log.info("Preparing for video capture...")
1806 self.buffer = self.create_buffer()
1807 self.buffer.open()
1809 def disarm(self):
1810 self.buffer.close()
1811 self.buffer = None
1813 def open(self):
1814 if self.buffer is None:
1815 self.arm()
1816 self.stream_on()
1817 self.device.log.info("Video capture started!")
1819 def close(self):
1820 if self.buffer:
1821 self.device.log.info("Closing video capture...")
1822 self.stream_off()
1823 self.disarm()
1824 self.device.log.info("Video capture closed")
1827class ReadSource(ReentrantOpen):
1828 def __init__(self, buffer_manager: BufferManager):
1829 super().__init__()
1830 self.buffer_manager = buffer_manager
1831 self.frame_reader = FrameReader(self.device, self.raw_read)
1832 self.format = None
1834 def __iter__(self) -> Iterator[Frame]:
1835 with self.frame_reader:
1836 while True:
1837 yield self.frame_reader.read()
1839 async def __aiter__(self) -> AsyncIterator[Frame]:
1840 async with self.frame_reader:
1841 while True:
1842 yield await self.frame_reader.aread()
1844 def open(self) -> None:
1845 self.format = self.buffer_manager.get_format()
1847 def close(self) -> None:
1848 self.format = None
1850 @property
1851 def device(self) -> Device:
1852 return self.buffer_manager.device
1854 def raw_grab(self) -> tuple[bytes, raw.v4l2_buffer]:
1855 data = os.read(self.device, 2**31 - 1)
1856 ns = time.time_ns()
1857 buff = raw.v4l2_buffer()
1858 buff.bytesused = len(data)
1859 buff.timestamp.set_ns(ns)
1860 return data, buff
1862 def raw_read(self) -> Frame:
1863 data, buff = self.raw_grab()
1864 return Frame(data, buff, self.format)
1866 def wait_read(self) -> Frame:
1867 device = self.device
1868 if device.io.select is not None:
1869 device.io.select.select((device,), (), ())
1870 return self.raw_read()
1872 def read(self) -> Frame:
1873 # first time we check what mode device was opened (blocking vs non-blocking)
1874 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1875 # is available for read. So we need to do it here
1876 if self.device.is_blocking:
1877 self.read = self.raw_read
1878 else:
1879 self.read = self.wait_read
1880 return self.read()
1883class MemorySource(ReentrantOpen):
1884 def __init__(self, buffer_manager: BufferManager, source: Memory):
1885 super().__init__()
1886 self.buffer_manager = buffer_manager
1887 self.source = source
1888 self.buffers = None
1889 self.queue = BufferQueue(buffer_manager, source)
1890 self.frame_reader = FrameReader(self.device, self.raw_read)
1891 self.format = None
1893 def __iter__(self) -> Iterator[Frame]:
1894 with self.frame_reader:
1895 yield from self.frame_reader
1897 def __aiter__(self) -> AsyncIterator[Frame]:
1898 return astream(self.device, self.raw_read)
1900 @property
1901 def device(self) -> Device:
1902 return self.buffer_manager.device
1904 def prepare_buffers(self):
1905 raise NotImplementedError
1907 def release_buffers(self):
1908 self.device.log.info("Freeing buffers...")
1909 for buf in self.buffers:
1910 buf.close()
1911 self.buffers = None
1912 self.buffer_manager.free_buffers(self.source)
1913 self.format = None
1914 self.device.log.info("Buffers freed")
1916 def open(self) -> None:
1917 self.format = self.buffer_manager.get_format()
1918 if self.buffers is None:
1919 self.prepare_buffers()
1921 def close(self) -> None:
1922 if self.buffers:
1923 self.release_buffers()
1925 def grab_from_buffer(self, buff: raw.v4l2_buffer, into=None):
1926 view = self.buffers[buff.index]
1927 size = buff.bytesused
1928 if into is None:
1929 return view[:size], buff
1930 into[:size] = memoryview(view)[:size]
1931 return memoryview(into)[:size], buff
1933 def raw_grab(self, into=None) -> tuple[Buffer, raw.v4l2_buffer]:
1934 with self.queue as buff:
1935 return self.grab_from_buffer(buff, into)
1937 def raw_read(self, into=None) -> Frame:
1938 data, buff = self.raw_grab(into)
1939 return Frame(data, buff, self.format)
1941 def wait_read(self) -> Frame:
1942 device = self.device
1943 if device.io.select is not None:
1944 device.io.select((device,), (), ())
1945 return self.raw_read()
1947 def read(self) -> Frame:
1948 # first time we check what mode device was opened (blocking vs non-blocking)
1949 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1950 # is available for read. So we need to do it here
1951 if self.device.is_blocking:
1952 self.read = self.raw_read
1953 else:
1954 self.read = self.wait_read
1955 return self.read()
1957 def raw_write(self, data: Buffer) -> raw.v4l2_buffer:
1958 with self.queue as buff:
1959 size = getattr(data, "nbytes", len(data))
1960 memory = self.buffers[buff.index]
1961 memory[:size] = data
1962 buff.bytesused = size
1963 return buff
1965 def wait_write(self, data: Buffer) -> raw.v4l2_buffer:
1966 device = self.device
1967 if device.io.select is not None:
1968 _, r, _ = device.io.select.select((), (device,), ())
1969 return self.raw_write(data)
1971 def write(self, data: Buffer) -> raw.v4l2_buffer:
1972 # first time we check what mode device was opened (blocking vs non-blocking)
1973 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1974 # is available for write. So we need to do it here
1975 if self.device.is_blocking:
1976 self.write = self.raw_write
1977 else:
1978 self.write = self.wait_write
1979 return self.write(data)
1982class UserPtr(MemorySource):
1983 def __init__(self, buffer_manager: BufferManager):
1984 super().__init__(buffer_manager, Memory.USERPTR)
1985 self.log = self.device.log.getChild("MemoryMap")
1987 def prepare_buffers(self):
1988 self.log.info("Reserving buffers...")
1989 self.buffer_manager.create_buffers(self.source)
1990 size = self.format.size
1991 self.buffers = []
1992 for index in range(self.buffer_manager.size):
1993 data = ctypes.create_string_buffer(size)
1994 self.buffers.append(data)
1995 buff = raw.v4l2_buffer()
1996 buff.index = index
1997 buff.type = self.buffer_manager.type
1998 buff.memory = self.source
1999 buff.m.userptr = ctypes.addressof(data)
2000 buff.length = size
2001 self.queue.enqueue(buff)
2002 self.log.info("Buffers reserved")
2005class MemoryMap(MemorySource):
2006 def __init__(self, buffer_manager: BufferManager):
2007 super().__init__(buffer_manager, Memory.MMAP)
2008 self.log = self.device.log.getChild("MemoryMap")
2010 def prepare_buffers(self):
2011 self.log.info("Reserving buffers...")
2012 buffers = self.buffer_manager.create_buffers(self.source)
2013 fd = self.device
2014 self.buffers = [mmap_from_buffer(fd, buff) for buff in buffers]
2015 self.buffer_manager.enqueue_buffers(Memory.MMAP)
2016 self.format = self.buffer_manager.get_format()
2017 self.log.info("Buffers reserved")
2020class EventReader(ReentrantOpen):
2021 def __init__(self, device: Device, max_queue_size=100):
2022 super().__init__()
2023 self.device = device
2024 self._loop = None
2025 self._selector = None
2026 self._buffer = None
2027 self._max_queue_size = max_queue_size
2029 async def __aenter__(self):
2030 if self.device.is_blocking:
2031 raise V4L2Error("Cannot use async event reader on blocking device")
2032 self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
2033 self._selector = select.epoll()
2034 self._loop = asyncio.get_event_loop()
2035 self._loop.add_reader(self._selector, self._on_event)
2036 self._selector.register(self.device, select.EPOLLPRI)
2037 return self
2039 async def __aexit__(self, exc_type, exc_value, traceback):
2040 self._selector.unregister(self.device)
2041 self._loop.remove_reader(self._selector)
2042 self._selector.close()
2043 self._selector = None
2044 self._loop = None
2045 self._buffer = None
2047 async def __aiter__(self):
2048 while True:
2049 yield await self.aread()
2051 def __iter__(self):
2052 device = self.device
2053 with self:
2054 if device.is_blocking:
2055 while True:
2056 yield device.deque_event()
2057 else:
2058 while True:
2059 for _ in self._selector.poll():
2060 yield device.deque_event()
2062 def _on_event(self):
2063 task = self._loop.create_future()
2064 try:
2065 self._selector.poll(0) # avoid blocking
2066 event = self.device.deque_event()
2067 task.set_result(event)
2068 except Exception as error:
2069 task.set_exception(error)
2071 buffer = self._buffer
2072 if buffer.full():
2073 self.device.log.debug("missed event")
2074 buffer.popleft()
2075 buffer.put_nowait(task)
2077 def open(self):
2078 if not self.device.is_blocking:
2079 self._selector = select.epoll()
2080 self._selector.register(self.device, select.EPOLLPRI)
2082 def close(self):
2083 if self._selector:
2084 self._selector.close()
2085 self._selector = None
2087 def fileno(self) -> int:
2088 """Return the underlying file descriptor (an integer) of the stream if it exists"""
2089 return self._selector.fileno()
2091 def read(self, timeout=None):
2092 device = self.device
2093 if not device.is_blocking:
2094 _, _, exc = device.io.select.select((), (), (self.device,), timeout)
2095 if not exc:
2096 return
2097 return device.deque_event()
2099 async def aread(self):
2100 """Wait for next event or return last event in queue"""
2101 task = await self._buffer.get()
2102 return await task
2105class FrameReader:
2106 def __init__(self, device: Device, raw_read: Callable[[], Buffer], max_queue_size: int = 1):
2107 self.device = device
2108 self.raw_read = raw_read
2109 self._loop = None
2110 self._selector = None
2111 self._buffer = None
2112 self._max_queue_size = max_queue_size
2113 self._device_fd = None
2115 async def __aenter__(self) -> Self:
2116 if self.device.is_blocking:
2117 raise V4L2Error("Cannot use async frame reader on blocking device")
2118 self._device_fd = self.device
2119 self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
2120 self._selector = select.epoll()
2121 self._loop = asyncio.get_event_loop()
2122 self._loop.add_reader(self._selector, self._on_event)
2123 self._selector.register(self._device_fd, select.POLLIN)
2124 return self
2126 async def __aexit__(self, exc_type, exc_value, traceback):
2127 with contextlib.suppress(OSError):
2128 # device may have been closed by now
2129 self._selector.unregister(self._device_fd)
2130 self._loop.remove_reader(self._selector)
2131 self._selector.close()
2132 self._selector = None
2133 self._loop = None
2134 self._buffer = None
2136 def __enter__(self) -> Self:
2137 return self
2139 def __exit__(self, exc_type, exc_value, tb):
2140 pass
2142 def __iter__(self):
2143 while True:
2144 yield self.read()
2146 def _on_event(self) -> None:
2147 task = self._loop.create_future()
2148 try:
2149 self._selector.poll(0) # avoid blocking
2150 data = self.raw_read()
2151 task.set_result(data)
2152 except Exception as error:
2153 task.set_exception(error)
2155 buffer = self._buffer
2156 if buffer.full():
2157 self.device.log.warn("missed frame")
2158 buffer.get_nowait()
2159 buffer.put_nowait(task)
2161 def read(self, timeout: Optional[float] = None) -> Frame:
2162 if not self.device.is_blocking:
2163 read, _, _ = self.device.io.select.select((self.device,), (), (), timeout)
2164 if not read:
2165 return
2166 return self.raw_read()
2168 async def aread(self) -> Frame:
2169 """Wait for next frame or return last frame"""
2170 task = await self._buffer.get()
2171 return await task
2174class BufferQueue:
2175 def __init__(self, buffer_manager: BufferManager, memory: Memory):
2176 self.buffer_manager = buffer_manager
2177 self.memory = memory
2178 self.raw_buffer = None
2180 def enqueue(self, buff: raw.v4l2_buffer):
2181 enqueue_buffer_raw(self.buffer_manager.device, buff)
2183 def dequeue(self):
2184 return self.buffer_manager.dequeue_buffer(self.memory)
2186 def __enter__(self) -> raw.v4l2_buffer:
2187 # get next buffer that has some data in it
2188 self.raw_buffer = self.dequeue()
2189 return self.raw_buffer
2191 def __exit__(self, *exc):
2192 # Make a copy of buffer. We need the original buffer that was sent to
2193 # dequeue in to keep frame info like frame number, timestamp, etc
2194 raw_buffer = raw.v4l2_buffer()
2195 memcpy(raw_buffer, self.raw_buffer)
2196 self.enqueue(raw_buffer)
2199class Write(ReentrantOpen):
2200 def __init__(self, buffer_manager: BufferManager):
2201 super().__init__()
2202 self.buffer_manager = buffer_manager
2204 @property
2205 def device(self) -> Device:
2206 return self.buffer_manager.device
2208 def raw_write(self, data: Buffer) -> None:
2209 self.device.write(data)
2211 def wait_write(self, data: Buffer) -> None:
2212 device = self.device
2213 if device.io.select is not None:
2214 _, w, _ = device.io.select((), (device,), ())
2215 if not w:
2216 raise OSError("Closed")
2217 self.raw_write(data)
2219 def write(self, data: Buffer) -> None:
2220 # first time we check what mode device was opened (blocking vs non-blocking)
2221 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
2222 # is available for write. So we need to do it here
2223 if self.device.is_blocking:
2224 self.write = self.raw_write
2225 else:
2226 self.write = self.wait_write
2227 return self.write(data)
2229 def open(self) -> None:
2230 pass
2232 def close(self) -> None:
2233 pass
2236class VideoOutput(BufferManager):
2237 def __init__(self, device: Device, size: int = 2, sink: Capability = None):
2238 super().__init__(device, BufferType.VIDEO_OUTPUT, size)
2239 self.buffer = None
2240 self.sink = sink
2242 def __enter__(self) -> Self:
2243 self.open()
2244 return self
2246 def __exit__(self, *exc) -> None:
2247 self.close()
2249 def open(self) -> None:
2250 if self.buffer is not None:
2251 return
2252 self.device.log.info("Preparing for video output...")
2253 capabilities = self.device.info.device_capabilities
2254 # Don't check for output capability. Some drivers (ex: v4l2loopback) don't
2255 # report being output capable so that apps like zoom recognize them
2256 # if Capability.VIDEO_OUTPUT not in capabilities:
2257 # raise V4L2Error("device lacks VIDEO_OUTPUT capability")
2258 sink = capabilities if self.sink is None else self.sink
2259 if Capability.STREAMING in sink:
2260 self.device.log.info("Video output using memory map")
2261 self.buffer = MemoryMap(self)
2262 elif Capability.READWRITE in sink:
2263 self.device.log.info("Video output using write")
2264 self.buffer = Write(self)
2265 else:
2266 raise OSError("Device needs to support STREAMING or READWRITE capability")
2267 self.buffer.open()
2268 self.stream_on()
2269 self.device.log.info("Video output started!")
2271 def close(self) -> None:
2272 if self.buffer:
2273 self.device.log.info("Closing video output...")
2274 try:
2275 self.stream_off()
2276 except Exception as error:
2277 self.device.log.warning("Failed to close stream: %r", error)
2278 try:
2279 self.buffer.close()
2280 except Exception as error:
2281 self.device.log.warning("Failed to close buffer: %r", error)
2282 self.buffer = None
2283 self.device.log.info("Video output closed")
2285 def write(self, data: Buffer) -> None:
2286 self.buffer.write(data)
2289def iter_video_files(path: PathLike = "/dev") -> Iterable[Path]:
2290 """Returns an iterator over all video files"""
2291 return iter_device_files(path=path, pattern="video*")
2294def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2295 """Returns an iterator over all video devices"""
2296 return (Device(name, **kwargs) for name in iter_video_files(path=path))
2299def iter_video_capture_files(path: PathLike = "/dev") -> Iterable[Path]:
2300 """Returns an iterator over all video files that have CAPTURE capability"""
2302 def filt(filename):
2303 with IO.open(filename) as fobj:
2304 caps = read_capabilities(fobj)
2305 return Capability.VIDEO_CAPTURE in Capability(caps.device_caps)
2307 return filter(filt, iter_video_files(path))
2310def iter_video_capture_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2311 """Returns an iterator over all video devices that have CAPTURE capability"""
2312 return (Device(name, **kwargs) for name in iter_video_capture_files(path))
2315def iter_video_output_files(path: PathLike = "/dev") -> Iterable[Path]:
2316 """
2317 Some drivers (ex: v4l2loopback) don't report being output capable so that
2318 apps like zoom recognize them as valid capture devices so some results might
2319 be missing
2320 """
2322 def filt(filename):
2323 with IO.open(filename) as fobj:
2324 caps = read_capabilities(fobj)
2325 return Capability.VIDEO_OUTPUT in Capability(caps.device_caps)
2327 return filter(filt, iter_video_files(path))
2330def iter_video_output_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2331 """Returns an iterator over all video devices that have VIDEO OUTPUT capability"""
2332 return (Device(name, **kwargs) for name in iter_video_output_files(path))
2335find = make_find(iter_devices)