Coverage for linuxpy/video/device.py: 83%
1508 statements
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""Human friendly interface to V4L2 (Video 4 Linux 2) subsystem."""
9import asyncio
10import collections
11import contextlib
12import copy
13import ctypes
14import enum
15import errno
16import fractions
17import logging
18import mmap
19import os
20import select
21import time
22from collections import UserDict
23from pathlib import Path
25from linuxpy.ctypes import cast, cenum, create_string_buffer, memcpy, string_at
26from linuxpy.device import (
27 BaseDevice,
28 ReentrantOpen,
29 iter_device_files,
30)
31from linuxpy.io import IO
32from linuxpy.ioctl import ioctl
33from linuxpy.types import AsyncIterator, Buffer, Callable, Iterable, Iterator, Optional, PathLike, Self, Union
34from linuxpy.util import astream, bit_indexes, make_find
36from . import raw
38log = logging.getLogger(__name__)
39log_mmap = log.getChild("mmap")
42class V4L2Error(Exception):
43 """Video for linux 2 error"""
46def _enum(name, prefix, klass=enum.IntEnum):
47 return klass(
48 name,
49 ((name.replace(prefix, ""), getattr(raw, name)) for name in dir(raw) if name.startswith(prefix)),
50 )
53FrameSizeType = raw.Frmsizetypes
54FrameIntervalType = raw.Frmivaltypes
55Field = raw.Field
56ImageFormatFlag = raw.ImageFormatFlag
57Capability = raw.Capability
58ControlID = raw.ControlID
59ControlFlag = raw.ControlFlag
60ControlType = raw.CtrlType
61ControlClass = raw.ControlClass
62SelectionTarget = raw.SelectionTarget
63EventType = raw.EventType
64EventControlChange = raw.EventControlChange
65IOC = raw.IOC
66BufferType = raw.BufType
67BufferFlag = raw.BufferFlag
68InputType = raw.InputType
69PixelFormat = raw.PixelFormat
70MetaFormat = raw.MetaFormat
71FrameSizeType = raw.Frmsizetypes
72Memory = raw.Memory
73InputStatus = raw.InputStatus
74OutputType = raw.OutputType
75InputCapabilities = raw.InputCapabilities
76OutputCapabilities = raw.OutputCapabilities
77Priority = raw.Priority
78TimeCodeType = raw.TimeCodeType
79TimeCodeFlag = raw.TimeCodeFlag
80EventSubscriptionFlag = raw.EventSubscriptionFlag
81StandardID = raw.StandardID
84def V4L2_CTRL_ID2CLASS(id_):
85 return id_ & 0x0FFF0000 # unsigned long
88def human_pixel_format(ifmt):
89 return "".join(map(chr, ((ifmt >> i) & 0xFF for i in range(0, 4 * 8, 8))))
92PixelFormat.human_str = lambda self: human_pixel_format(self.value)
93MetaFormat.human_str = lambda self: human_pixel_format(self.value)
96ImageFormat = collections.namedtuple("ImageFormat", "type description flags pixel_format")
98MetaFmt = collections.namedtuple("MetaFmt", "format max_buffer_size width height bytes_per_line")
100Format = collections.namedtuple("Format", "width height pixel_format size")
102CropCapability = collections.namedtuple("CropCapability", "type bounds defrect pixel_aspect")
104Rect = collections.namedtuple("Rect", "left top width height")
106Size = collections.namedtuple("Size", "width height")
108FrameType = collections.namedtuple("FrameType", "type pixel_format width height min_fps max_fps step_fps")
110Input = collections.namedtuple("InputType", "index name type audioset tuner std status capabilities")
112Output = collections.namedtuple("OutputType", "index name type audioset modulator std capabilities")
114Standard = collections.namedtuple("Standard", "index id name frameperiod framelines")
117CROP_BUFFER_TYPES = {
118 BufferType.VIDEO_CAPTURE,
119 BufferType.VIDEO_CAPTURE_MPLANE,
120 BufferType.VIDEO_OUTPUT,
121 BufferType.VIDEO_OUTPUT_MPLANE,
122 BufferType.VIDEO_OVERLAY,
123}
125IMAGE_FORMAT_BUFFER_TYPES = {
126 BufferType.VIDEO_CAPTURE,
127 BufferType.VIDEO_CAPTURE_MPLANE,
128 BufferType.VIDEO_OUTPUT,
129 BufferType.VIDEO_OUTPUT_MPLANE,
130 BufferType.VIDEO_OVERLAY,
131 BufferType.META_CAPTURE,
132 BufferType.META_OUTPUT,
133}
136def mem_map(fd, length, offset):
137 log_mmap.debug("%s, length=%d, offset=%d", fd, length, offset)
138 return mmap.mmap(fd, length, offset=offset)
141def flag_items(flag):
142 return [item for item in type(flag) if item in flag]
145def raw_crop_caps_to_crop_caps(crop):
146 return CropCapability(
147 type=BufferType(crop.type),
148 bounds=Rect(
149 crop.bounds.left,
150 crop.bounds.top,
151 crop.bounds.width,
152 crop.bounds.height,
153 ),
154 defrect=Rect(
155 crop.defrect.left,
156 crop.defrect.top,
157 crop.defrect.width,
158 crop.defrect.height,
159 ),
160 pixel_aspect=crop.pixelaspect.numerator / crop.pixelaspect.denominator,
161 )
164CropCapability.from_raw = raw_crop_caps_to_crop_caps
167def raw_read_crop_capabilities(fd, buffer_type: BufferType) -> raw.v4l2_cropcap:
168 crop = raw.v4l2_cropcap()
169 crop.type = buffer_type
170 return ioctl(fd, IOC.CROPCAP, crop)
173def read_crop_capabilities(fd, buffer_type: BufferType) -> CropCapability:
174 try:
175 crop = raw_read_crop_capabilities(fd, buffer_type)
176 except OSError as error:
177 if error.errno == errno.ENODATA:
178 return None
179 raise
180 return raw_crop_caps_to_crop_caps(crop)
183ITER_BREAK = (errno.ENOTTY, errno.ENODATA, errno.EPIPE)
186def iter_read(fd, ioc, indexed_struct, start=0, stop=128, step=1, ignore_einval=False):
187 for index in range(start, stop, step):
188 indexed_struct.index = index
189 try:
190 ioctl(fd, ioc, indexed_struct)
191 yield indexed_struct
192 except OSError as error:
193 if error.errno == errno.EINVAL:
194 if ignore_einval:
195 continue
196 else:
197 break
198 elif error.errno in ITER_BREAK:
199 break
200 else:
201 raise
204def iter_read_frame_intervals(fd, fmt, w, h):
205 value = raw.v4l2_frmivalenum()
206 value.pixel_format = fmt
207 value.width = w
208 value.height = h
209 count = 0
210 for val in iter_read(fd, IOC.ENUM_FRAMEINTERVALS, value):
211 # values come in frame interval (fps = 1/interval)
212 try:
213 ftype = FrameIntervalType(val.type)
214 except ValueError:
215 break
216 if ftype == FrameIntervalType.DISCRETE:
217 min_fps = max_fps = step_fps = fractions.Fraction(val.discrete.denominator / val.discrete.numerator)
218 else:
219 if val.stepwise.min.numerator == 0:
220 min_fps = 0
221 else:
222 min_fps = fractions.Fraction(val.stepwise.min.denominator, val.stepwise.min.numerator)
223 if val.stepwise.max.numerator == 0:
224 max_fps = 0
225 else:
226 max_fps = fractions.Fraction(val.stepwise.max.denominator, val.stepwise.max.numerator)
227 if val.stepwise.step.numerator == 0:
228 step_fps = 0
229 else:
230 step_fps = fractions.Fraction(val.stepwise.step.denominator, val.stepwise.step.numerator)
231 yield FrameType(
232 type=ftype,
233 pixel_format=fmt,
234 width=w,
235 height=h,
236 min_fps=min_fps,
237 max_fps=max_fps,
238 step_fps=step_fps,
239 )
240 count += 1
241 if not count:
242 # If it wasn't possible to get frame interval, report discovered frame size anyway
243 yield FrameType(
244 type=FrameIntervalType.DISCRETE,
245 pixel_format=fmt,
246 width=w,
247 height=h,
248 min_fps=0,
249 max_fps=0,
250 step_fps=0,
251 )
254def iter_read_discrete_frame_sizes(fd, pixel_format):
255 size = raw.v4l2_frmsizeenum()
256 size.index = 0
257 size.pixel_format = pixel_format
258 for val in iter_read(fd, IOC.ENUM_FRAMESIZES, size):
259 if size.type != FrameSizeType.DISCRETE:
260 break
261 yield val
264def iter_read_pixel_formats_frame_intervals(fd, pixel_formats):
265 for pixel_format in pixel_formats:
266 for size in iter_read_discrete_frame_sizes(fd, pixel_format):
267 yield from iter_read_frame_intervals(fd, pixel_format, size.discrete.width, size.discrete.height)
270def read_capabilities(fd):
271 caps = raw.v4l2_capability()
272 ioctl(fd, IOC.QUERYCAP, caps)
273 return caps
276def iter_read_formats(fd, type):
277 format = raw.v4l2_fmtdesc()
278 format.type = type
279 pixel_formats = set(PixelFormat)
280 meta_formats = set(MetaFormat)
281 for fmt in iter_read(fd, IOC.ENUM_FMT, format):
282 pixel_fmt = fmt.pixelformat
283 if type in {BufferType.VIDEO_CAPTURE, BufferType.VIDEO_OUTPUT}:
284 if pixel_fmt not in pixel_formats:
285 log.warning(
286 "ignored unknown pixel format %s (%d)",
287 human_pixel_format(pixel_fmt),
288 pixel_fmt,
289 )
290 continue
291 pixel_format = PixelFormat(pixel_fmt)
292 elif type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}:
293 if pixel_fmt not in meta_formats:
294 log.warning(
295 "ignored unknown meta format %s (%d)",
296 human_pixel_format(pixel_fmt),
297 pixel_fmt,
298 )
299 continue
300 pixel_format = MetaFormat(pixel_fmt)
301 image_format = ImageFormat(
302 type=type,
303 flags=ImageFormatFlag(fmt.flags),
304 description=fmt.description.decode(),
305 pixel_format=pixel_format,
306 )
307 yield image_format
310def iter_read_inputs(fd):
311 input = raw.v4l2_input()
312 for inp in iter_read(fd, IOC.ENUMINPUT, input):
313 input_type = Input(
314 index=inp.index,
315 name=inp.name.decode(),
316 type=InputType(inp.type),
317 audioset=bit_indexes(inp.audioset),
318 tuner=inp.tuner,
319 std=StandardID(inp.std),
320 status=InputStatus(inp.status),
321 capabilities=InputCapabilities(inp.capabilities),
322 )
323 yield input_type
326def iter_read_outputs(fd):
327 output = raw.v4l2_output()
328 for out in iter_read(fd, IOC.ENUMOUTPUT, output):
329 output_type = Output(
330 index=out.index,
331 name=out.name.decode(),
332 type=OutputType(out.type),
333 audioset=bit_indexes(out.audioset),
334 modulator=out.modulator,
335 std=StandardID(out.std),
336 capabilities=OutputCapabilities(out.capabilities),
337 )
338 yield output_type
341def iter_read_video_standards(fd):
342 std = raw.v4l2_standard()
343 for item in iter_read(fd, IOC.ENUMSTD, std):
344 period = item.frameperiod
345 yield Standard(
346 index=item.index,
347 id=StandardID(item.id),
348 name=item.name.decode(),
349 frameperiod=fractions.Fraction(period.denominator, period.numerator),
350 framelines=item.framelines,
351 )
354def iter_read_controls(fd):
355 ctrl = raw.v4l2_query_ext_ctrl()
356 nxt = ControlFlag.NEXT_CTRL | ControlFlag.NEXT_COMPOUND
357 ctrl.id = nxt
358 for ctrl_ext in iter_read(fd, IOC.QUERY_EXT_CTRL, ctrl):
359 yield copy.deepcopy(ctrl_ext)
360 ctrl_ext.id |= nxt
363def iter_read_menu(fd, ctrl):
364 qmenu = raw.v4l2_querymenu()
365 qmenu.id = ctrl.id
366 for menu in iter_read(
367 fd,
368 IOC.QUERYMENU,
369 qmenu,
370 start=ctrl._info.minimum,
371 stop=ctrl._info.maximum + 1,
372 step=ctrl._info.step,
373 ignore_einval=True,
374 ):
375 yield copy.deepcopy(menu)
378def query_buffer(fd, buffer_type: BufferType, memory: Memory, index: int) -> raw.v4l2_buffer:
379 buff = raw.v4l2_buffer()
380 buff.type = buffer_type
381 buff.memory = memory
382 buff.index = index
383 buff.reserved = 0
384 ioctl(fd, IOC.QUERYBUF, buff)
385 return buff
388def enqueue_buffer_raw(fd, buff: raw.v4l2_buffer) -> raw.v4l2_buffer:
389 if not buff.timestamp.secs:
390 buff.timestamp.set_ns()
391 ioctl(fd, IOC.QBUF, buff)
392 return buff
395def enqueue_buffer(fd, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
396 buff = raw.v4l2_buffer()
397 buff.type = buffer_type
398 buff.memory = memory
399 buff.bytesused = size
400 buff.index = index
401 buff.field = Field.NONE
402 buff.reserved = 0
403 return enqueue_buffer_raw(fd, buff)
406def dequeue_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
407 buff = raw.v4l2_buffer()
408 buff.type = buffer_type
409 buff.memory = memory
410 buff.index = 0
411 buff.reserved = 0
412 ioctl(fd, IOC.DQBUF, buff)
413 return buff
416def request_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> raw.v4l2_requestbuffers:
417 req = raw.v4l2_requestbuffers()
418 req.type = buffer_type
419 req.memory = memory
420 req.count = count
421 ioctl(fd, IOC.REQBUFS, req)
422 if not req.count:
423 raise OSError("Not enough buffer memory")
424 return req
427def free_buffers(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_requestbuffers:
428 req = raw.v4l2_requestbuffers()
429 req.type = buffer_type
430 req.memory = memory
431 req.count = 0
432 ioctl(fd, IOC.REQBUFS, req)
433 return req
436def export_buffer(fd, buffer_type: BufferType, index: int) -> int:
437 req = raw.v4l2_exportbuffer(type=buffer_type, index=index)
438 return ioctl(fd, IOC.EXPBUF, req).fd
441def create_buffers(fd, format: raw.v4l2_format, memory: Memory, count: int) -> raw.v4l2_create_buffers:
442 """Create buffers for Memory Mapped or User Pointer or DMA Buffer I/O"""
443 req = raw.v4l2_create_buffers()
444 req.format = format
445 req.memory = memory
446 req.count = count
447 ioctl(fd, IOC.CREATE_BUFS, req)
448 if not req.count:
449 raise OSError("Not enough buffer memory")
450 return req
453def set_raw_format(fd, fmt: raw.v4l2_format):
454 return ioctl(fd, IOC.S_FMT, fmt)
457def set_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"):
458 fmt = raw.v4l2_format()
459 if isinstance(pixel_format, str):
460 pixel_format = raw.v4l2_fourcc(*pixel_format)
461 fmt.type = buffer_type
462 fmt.fmt.pix.pixelformat = pixel_format
463 fmt.fmt.pix.field = Field.ANY
464 fmt.fmt.pix.width = width
465 fmt.fmt.pix.height = height
466 fmt.fmt.pix.bytesperline = 0
467 fmt.fmt.pix.sizeimage = 0
468 return set_raw_format(fd, fmt)
471def get_raw_format(fd, buffer_type) -> raw.v4l2_format:
472 fmt = raw.v4l2_format()
473 fmt.type = buffer_type
474 ioctl(fd, IOC.G_FMT, fmt)
475 return fmt
478def get_format(fd, buffer_type) -> Union[Format, MetaFmt]:
479 f = get_raw_format(fd, buffer_type)
480 if buffer_type in {BufferType.META_CAPTURE, BufferType.META_OUTPUT}:
481 return MetaFmt(
482 format=MetaFormat(f.fmt.meta.dataformat),
483 max_buffer_size=f.fmt.meta.buffersize,
484 width=f.fmt.meta.width,
485 height=f.fmt.meta.height,
486 bytes_per_line=f.fmt.meta.bytesperline,
487 )
488 return Format(
489 width=f.fmt.pix.width,
490 height=f.fmt.pix.height,
491 pixel_format=PixelFormat(f.fmt.pix.pixelformat),
492 size=f.fmt.pix.sizeimage,
493 )
496def try_raw_format(fd, fmt: raw.v4l2_format):
497 ioctl(fd, IOC.TRY_FMT, fmt)
500def try_format(fd, buffer_type: BufferType, width: int, height: int, pixel_format: str = "MJPG"):
501 fmt = raw.v4l2_format()
502 if isinstance(pixel_format, str):
503 pixel_format = raw.v4l2_fourcc(*pixel_format)
504 fmt.type = buffer_type
505 fmt.fmt.pix.pixelformat = pixel_format
506 fmt.fmt.pix.field = Field.ANY
507 fmt.fmt.pix.width = width
508 fmt.fmt.pix.height = height
509 fmt.fmt.pix.bytesperline = 0
510 fmt.fmt.pix.sizeimage = 0
511 return try_raw_format(fd, fmt)
514def get_parm(fd, buffer_type):
515 p = raw.v4l2_streamparm()
516 p.type = buffer_type
517 ioctl(fd, IOC.G_PARM, p)
518 return p
521def set_fps(fd, buffer_type, fps):
522 # v4l2 fraction is u32
523 max_denominator = int(min(2**32, 2**32 / fps))
524 p = raw.v4l2_streamparm()
525 p.type = buffer_type
526 fps = fractions.Fraction(fps).limit_denominator(max_denominator)
527 if buffer_type == BufferType.VIDEO_CAPTURE:
528 p.parm.capture.timeperframe.numerator = fps.denominator
529 p.parm.capture.timeperframe.denominator = fps.numerator
530 elif buffer_type == BufferType.VIDEO_OUTPUT:
531 p.parm.output.timeperframe.numerator = fps.denominator
532 p.parm.output.timeperframe.denominator = fps.numerator
533 else:
534 raise ValueError(f"Unsupported buffer type {buffer_type!r}")
535 return ioctl(fd, IOC.S_PARM, p)
538def get_fps(fd, buffer_type):
539 p = get_parm(fd, buffer_type)
540 if buffer_type == BufferType.VIDEO_CAPTURE:
541 parm = p.parm.capture
542 elif buffer_type == BufferType.VIDEO_OUTPUT:
543 parm = p.parm.output
544 else:
545 raise ValueError(f"Unsupported buffer type {buffer_type!r}")
546 return fractions.Fraction(parm.timeperframe.denominator, parm.timeperframe.numerator)
549def stream_on(fd, buffer_type):
550 btype = cenum(buffer_type)
551 return ioctl(fd, IOC.STREAMON, btype)
554def stream_off(fd, buffer_type):
555 btype = cenum(buffer_type)
556 return ioctl(fd, IOC.STREAMOFF, btype)
559def set_selection(fd, buffer_type, target, rectangle):
560 sel = raw.v4l2_selection()
561 sel.type = buffer_type
562 sel.target = target
563 sel.r.left = rectangle.left
564 sel.r.top = rectangle.top
565 sel.r.width = rectangle.width
566 sel.r.height = rectangle.height
567 ioctl(fd, IOC.S_SELECTION, sel)
570def get_selection(
571 fd,
572 buffer_type: BufferType,
573 target: SelectionTarget = SelectionTarget.CROP,
574) -> Rect:
575 sel = raw.v4l2_selection()
576 sel.type = buffer_type
577 sel.target = target
578 ioctl(fd, IOC.G_SELECTION, sel)
579 return Rect(left=sel.r.left, top=sel.r.top, width=sel.r.width, height=sel.r.height)
582def get_control(fd, id):
583 control = raw.v4l2_control(id)
584 ioctl(fd, IOC.G_CTRL, control)
585 return control.value
588CTRL_TYPE_CTYPE_ARRAY = {
589 ControlType.U8: ctypes.c_uint8,
590 ControlType.U16: ctypes.c_uint16,
591 ControlType.U32: ctypes.c_uint32,
592 ControlType.INTEGER: ctypes.c_int,
593 ControlType.INTEGER64: ctypes.c_int64,
594}
597CTRL_TYPE_CTYPE_STRUCT = {
598 # ControlType.AREA: raw.v4l2_area,
599}
602def _struct_for_ctrl_type(ctrl_type):
603 ctrl_type = ControlType(ctrl_type).name.lower()
604 name = f"v4l2_ctrl_{ctrl_type}"
605 try:
606 return getattr(raw, name)
607 except AttributeError:
608 name = f"v4l2_{ctrl_type}"
609 return getattr(raw, name)
612def get_ctrl_type_struct(ctrl_type):
613 struct = CTRL_TYPE_CTYPE_STRUCT.get(ctrl_type)
614 if struct is None:
615 struct = _struct_for_ctrl_type(ctrl_type)
616 CTRL_TYPE_CTYPE_STRUCT[ctrl_type] = struct
617 return struct
620def convert_to_ctypes_array(lst, depth, ctype):
621 """Convert a list (arbitrary depth) to a ctypes array."""
622 if depth == 1:
623 return (ctype * len(lst))(*lst)
625 # Recursive case: we need to process the sub-lists first
626 sub_arrays = [convert_to_ctypes_array(sub_lst, depth - 1, ctype) for sub_lst in lst]
627 array_type = len(sub_arrays) * type(sub_arrays[0]) # Create the array type
628 return array_type(*sub_arrays)
631def _prepare_read_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control):
632 raw_control.id = control.id
633 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags)
634 if has_payload:
635 if control.type == ControlType.STRING:
636 size = control.maximum + 1
637 payload = ctypes.create_string_buffer(size)
638 raw_control.string = payload
639 raw_control.size = size
640 else:
641 ctype = CTRL_TYPE_CTYPE_ARRAY.get(control.type)
642 raw_control.size = control.elem_size * control.elems
643 if ctype is None:
644 ctype = get_ctrl_type_struct(control.type)
645 payload = ctype()
646 raw_control.ptr = ctypes.cast(ctypes.pointer(payload), ctypes.c_void_p)
647 else:
648 for i in range(control.nr_of_dims):
649 ctype *= control.dims[i]
650 payload = ctype()
651 raw_control.size = control.elem_size * control.elems
652 raw_control.ptr = ctypes.cast(payload, ctypes.c_void_p)
653 return payload
656def _get_control_value(control: raw.v4l2_query_ext_ctrl, raw_control: raw.v4l2_ext_control, data):
657 if data is None:
658 if control.type == ControlType.INTEGER64:
659 return raw_control.value64
660 return raw_control.value
661 else:
662 if control.type == ControlType.STRING:
663 return data.value.decode()
664 return data
667def get_controls_values(fd, controls: list[raw.v4l2_query_ext_ctrl], which=raw.ControlWhichValue.CUR_VAL, request_fd=0):
668 n = len(controls)
669 ctrls = raw.v4l2_ext_controls()
670 ctrls.which = which
671 ctrls.count = n
672 ctrls.request_fd = request_fd
673 ctrls.controls = (n * raw.v4l2_ext_control)()
674 values = [_prepare_read_control_value(*args) for args in zip(controls, ctrls.controls)]
675 ioctl(fd, IOC.G_EXT_CTRLS, ctrls)
676 return [_get_control_value(*args) for args in zip(controls, ctrls.controls, values)]
679def set_control(fd, id, value):
680 control = raw.v4l2_control(id, value)
681 ioctl(fd, IOC.S_CTRL, control)
684def _prepare_write_controls_values(control: raw.v4l2_query_ext_ctrl, value: object, raw_control: raw.v4l2_ext_control):
685 raw_control.id = control.id
686 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(control.flags)
687 if has_payload:
688 if control.type == ControlType.STRING:
689 raw_control.string = ctypes.create_string_buffer(value.encode())
690 raw_control.size = len(value) + 1
691 else:
692 array_type = CTRL_TYPE_CTYPE_ARRAY.get(control.type)
693 raw_control.size = control.elem_size * control.elems
694 # a struct: assume value is proper raw struct
695 if array_type is None:
696 value = ctypes.pointer(value)
697 else:
698 value = convert_to_ctypes_array(value, control.nr_of_dims, array_type)
699 ptr = ctypes.cast(value, ctypes.c_void_p)
700 raw_control.ptr = ptr
701 else:
702 if control.type == ControlType.INTEGER64:
703 raw_control.value64 = value
704 else:
705 raw_control.value = value
708def set_controls_values(
709 fd, controls_values: list[tuple[raw.v4l2_query_ext_ctrl, object]], which=raw.ControlWhichValue.CUR_VAL, request_fd=0
710):
711 n = len(controls_values)
712 ctrls = raw.v4l2_ext_controls()
713 ctrls.which = which
714 ctrls.count = n
715 ctrls.request_fd = request_fd
716 ctrls.controls = (n * raw.v4l2_ext_control)()
717 for (control, value), raw_control in zip(controls_values, ctrls.controls):
718 _prepare_write_controls_values(control, value, raw_control)
719 ioctl(fd, IOC.S_EXT_CTRLS, ctrls)
722def get_priority(fd) -> Priority:
723 priority = ctypes.c_uint()
724 ioctl(fd, IOC.G_PRIORITY, priority)
725 return Priority(priority.value)
728def set_priority(fd, priority: Priority):
729 priority = ctypes.c_uint(priority.value)
730 ioctl(fd, IOC.S_PRIORITY, priority)
733def subscribe_event(
734 fd,
735 event_type: EventType,
736 id: int = 0,
737 flags: EventSubscriptionFlag = 0,
738):
739 sub = raw.v4l2_event_subscription()
740 sub.type = event_type
741 sub.id = id
742 sub.flags = flags
743 ioctl(fd, IOC.SUBSCRIBE_EVENT, sub)
746def unsubscribe_event(fd, event_type: EventType = EventType.ALL, id: int = 0):
747 sub = raw.v4l2_event_subscription()
748 sub.type = event_type
749 sub.id = id
750 ioctl(fd, IOC.UNSUBSCRIBE_EVENT, sub)
753def deque_event(fd) -> raw.v4l2_event:
754 event = raw.v4l2_event()
755 return ioctl(fd, IOC.DQEVENT, event)
758def set_edid(fd, edid):
759 if len(edid) % 128:
760 raise ValueError(f"EDID length {len(edid)} is not multiple of 128")
761 edid_struct = raw.v4l2_edid()
762 edid_struct.pad = 0
763 edid_struct.start_block = 0
764 edid_struct.blocks = len(edid) // 128
765 edid_array = create_string_buffer(edid)
766 edid_struct.edid = cast(edid_array, type(edid_struct.edid))
767 ioctl(fd, IOC.S_EDID, edid_struct)
770def clear_edid(fd):
771 set_edid(fd, b"")
774def get_edid(fd):
775 edid_struct = raw.v4l2_edid()
776 ioctl(fd, IOC.G_EDID, edid_struct)
777 if edid_struct.blocks == 0:
778 return b""
779 edid_len = 128 * edid_struct.blocks
780 edid_array = create_string_buffer(b"\0" * edid_len)
781 edid_struct.edid = cast(edid_array, type(edid_struct.edid))
782 ioctl(fd, IOC.G_EDID, edid_struct)
783 return string_at(edid_struct.edid, edid_len)
786def get_input(fd):
787 inp = ctypes.c_uint()
788 ioctl(fd, IOC.G_INPUT, inp)
789 return inp.value
792def set_input(fd, index: int):
793 index = ctypes.c_uint(index)
794 ioctl(fd, IOC.S_INPUT, index)
797def get_output(fd):
798 out = ctypes.c_uint()
799 ioctl(fd, IOC.G_OUTPUT, out)
800 return out.value
803def set_output(fd, index: int):
804 index = ctypes.c_uint(index)
805 ioctl(fd, IOC.S_OUTPUT, index)
808def get_std(fd) -> StandardID:
809 out = ctypes.c_uint64()
810 ioctl(fd, IOC.G_STD, out)
811 return StandardID(out.value)
814def set_std(fd, std):
815 ioctl(fd, IOC.S_STD, std)
818def query_std(fd) -> StandardID:
819 out = ctypes.c_uint64()
820 ioctl(fd, IOC.QUERYSTD, out)
821 return StandardID(out.value)
824SubdevFormat = collections.namedtuple(
825 "SubdevFormat", "pad which width height code field colorspace quantization xfer_func flags stream"
826)
829def _translate_subdev_format(fmt: raw.v4l2_subdev_format):
830 return SubdevFormat(
831 pad=fmt.pad,
832 which=raw.SubdevFormatWhence(fmt.which),
833 width=fmt.format.width,
834 height=fmt.format.height,
835 code=raw.MbusPixelcode(fmt.format.code),
836 field=raw.Field(fmt.format.field),
837 colorspace=raw.Colorspace(fmt.format.colorspace),
838 quantization=raw.Quantization(fmt.format.quantization),
839 xfer_func=raw.XferFunc(fmt.format.xfer_func),
840 flags=raw.MbusFrameFormatFlag(fmt.format.flags),
841 stream=fmt.stream,
842 )
845def get_subdevice_format(fd, pad: int = 0) -> raw.v4l2_subdev_format:
846 fmt = raw.v4l2_subdev_format(pad=pad, which=raw.SubdevFormatWhence.ACTIVE)
847 return _translate_subdev_format(ioctl(fd, IOC.SUBDEV_G_FMT, fmt))
850# Helpers
853def request_and_query_buffer(fd, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
854 """request + query buffers"""
855 buffers = request_and_query_buffers(fd, buffer_type, memory, 1)
856 return buffers[0]
859def request_and_query_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
860 """request + query buffers"""
861 request_buffers(fd, buffer_type, memory, count)
862 return [query_buffer(fd, buffer_type, memory, index) for index in range(count)]
865def mmap_from_buffer(fd, buff: raw.v4l2_buffer) -> mmap.mmap:
866 return mem_map(fd, buff.length, offset=buff.m.offset)
869def create_mmap_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[mmap.mmap]:
870 """create buffers + mmap_from_buffer"""
871 return [mmap_from_buffer(fd, buff) for buff in request_and_query_buffers(fd, buffer_type, memory, count)]
874def create_mmap_buffer(fd, buffer_type: BufferType, memory: Memory) -> mmap.mmap:
875 return create_mmap_buffers(fd, buffer_type, memory, 1)
878def enqueue_buffers(fd, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
879 return [enqueue_buffer(fd, buffer_type, memory, 0, index) for index in range(count)]
882class Device(BaseDevice):
883 PREFIX = "/dev/video"
885 def __init__(self, name_or_file, read_write=True, io=IO):
886 self.info = None
887 self.controls = None
888 super().__init__(name_or_file, read_write=read_write, io=io)
890 def __iter__(self):
891 with VideoCapture(self) as stream:
892 yield from stream
894 async def __aiter__(self):
895 with VideoCapture(self) as stream:
896 async for frame in stream:
897 yield frame
899 def _on_open(self):
900 self.info = InfoEx(self)
901 self.controls = Controls.from_device(self)
903 def query_buffer(self, buffer_type, memory, index):
904 return query_buffer(self.fileno(), buffer_type, memory, index)
906 def enqueue_buffer(self, buffer_type: BufferType, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
907 return enqueue_buffer(self.fileno(), buffer_type, memory, size, index)
909 def dequeue_buffer(self, buffer_type: BufferType, memory: Memory) -> raw.v4l2_buffer:
910 return dequeue_buffer(self.fileno(), buffer_type, memory)
912 def request_buffers(self, buffer_type, memory, size):
913 return request_buffers(self.fileno(), buffer_type, memory, size)
915 def create_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
916 return request_and_query_buffers(self.fileno(), buffer_type, memory, count)
918 def free_buffers(self, buffer_type, memory):
919 return free_buffers(self.fileno(), buffer_type, memory)
921 def enqueue_buffers(self, buffer_type: BufferType, memory: Memory, count: int) -> list[raw.v4l2_buffer]:
922 return enqueue_buffers(self.fileno(), buffer_type, memory, count)
924 def set_format(
925 self,
926 buffer_type: BufferType,
927 width: int,
928 height: int,
929 pixel_format: str = "MJPG",
930 ):
931 return set_format(self.fileno(), buffer_type, width, height, pixel_format=pixel_format)
933 def get_format(self, buffer_type) -> Format:
934 return get_format(self.fileno(), buffer_type)
936 def set_fps(self, buffer_type, fps):
937 return set_fps(self.fileno(), buffer_type, fps)
939 def get_fps(self, buffer_type):
940 return get_fps(self.fileno(), buffer_type)
942 def set_selection(self, buffer_type, target, rectangle):
943 return set_selection(self.fileno(), buffer_type, target, rectangle)
945 def get_selection(self, buffer_type, target):
946 return get_selection(self.fileno(), buffer_type, target)
948 def get_priority(self) -> Priority:
949 return get_priority(self.fileno())
951 def set_priority(self, priority: Priority):
952 set_priority(self.fileno(), priority)
954 def stream_on(self, buffer_type):
955 self.log.info("Starting %r stream...", buffer_type.name)
956 stream_on(self.fileno(), buffer_type)
957 self.log.info("%r stream ON", buffer_type.name)
959 def stream_off(self, buffer_type):
960 self.log.info("Stoping %r stream...", buffer_type.name)
961 stream_off(self.fileno(), buffer_type)
962 self.log.info("%r stream OFF", buffer_type.name)
964 def write(self, data: bytes) -> None:
965 self._fobj.write(data)
967 def subscribe_event(
968 self,
969 event_type: EventType = EventType.ALL,
970 id: int = 0,
971 flags: EventSubscriptionFlag = 0,
972 ):
973 return subscribe_event(self.fileno(), event_type, id, flags)
975 def unsubscribe_event(self, event_type: EventType = EventType.ALL, id: int = 0):
976 return unsubscribe_event(self.fileno(), event_type, id)
978 def deque_event(self):
979 return deque_event(self.fileno())
981 def set_edid(self, edid):
982 set_edid(self.fileno(), edid)
984 def clear_edid(self):
985 clear_edid(self.fileno())
987 def get_edid(self):
988 return get_edid(self.fileno())
990 def get_input(self):
991 return get_input(self.fileno())
993 def set_input(self, index: int):
994 return set_input(self.fileno(), index)
996 def get_output(self):
997 return get_output(self.fileno())
999 def set_output(self, index: int):
1000 return set_output(self.fileno(), index)
1002 def get_std(self) -> StandardID:
1003 return get_std(self.fileno())
1005 def set_std(self, std):
1006 return set_std(self.fileno(), std)
1008 def query_std(self) -> StandardID:
1009 return query_std(self.fileno())
1012class SubDevice(BaseDevice):
1013 def get_format(self, pad: int = 0) -> SubdevFormat:
1014 return get_subdevice_format(self, pad=pad)
1017def create_artificial_control_class(class_id):
1018 return raw.v4l2_query_ext_ctrl(
1019 id=class_id | 1,
1020 name=b"Generic Controls",
1021 type=ControlType.CTRL_CLASS,
1022 )
1025class Controls(dict):
1026 def __init__(self, device: Device):
1027 super().__init__()
1028 self.__dict__["_device"] = device
1029 self.__dict__["_initialized"] = False
1031 def _init_if_needed(self):
1032 if not self._initialized:
1033 self._load()
1034 self.__dict__["_initialized"] = True
1036 def __getitem__(self, name):
1037 self._init_if_needed()
1038 return super().__getitem__(name)
1040 def __len__(self):
1041 self._init_if_needed()
1042 return super().__len__()
1044 def _load(self):
1045 ctrl_type_map = {
1046 ControlType.BOOLEAN: BooleanControl,
1047 ControlType.INTEGER: IntegerControl,
1048 ControlType.INTEGER64: Integer64Control,
1049 ControlType.MENU: MenuControl,
1050 ControlType.INTEGER_MENU: MenuControl,
1051 ControlType.U8: U8Control,
1052 ControlType.U16: U16Control,
1053 ControlType.U32: U32Control,
1054 ControlType.BUTTON: ButtonControl,
1055 }
1056 classes = {}
1057 for ctrl in self._device.info.controls:
1058 ctrl_type = ControlType(ctrl.type)
1059 ctrl_class_id = V4L2_CTRL_ID2CLASS(ctrl.id)
1060 if ctrl_type == ControlType.CTRL_CLASS:
1061 classes[ctrl_class_id] = ctrl
1062 else:
1063 klass = classes.get(ctrl_class_id)
1064 if klass is None:
1065 klass = create_artificial_control_class(ctrl_class_id)
1066 classes[ctrl_class_id] = klass
1067 has_payload = ControlFlag.HAS_PAYLOAD in ControlFlag(ctrl.flags)
1068 if has_payload:
1069 ctrl_class = CompoundControl
1070 else:
1071 ctrl_class = ctrl_type_map.get(ctrl_type, GenericControl)
1072 self[ctrl.id] = ctrl_class(self._device, ctrl, klass)
1074 @classmethod
1075 def from_device(cls, device):
1076 """Deprecated: backward compatible. Please use Controls(device) constructor directly"""
1077 return cls(device)
1079 def __getattr__(self, key):
1080 with contextlib.suppress(KeyError):
1081 return self[key]
1082 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'")
1084 def __setattr__(self, key, value):
1085 self._init_if_needed()
1086 self[key] = value
1088 def __missing__(self, key):
1089 self._init_if_needed()
1090 for v in self.values():
1091 if isinstance(v, BaseControl) and (v.config_name == key):
1092 return v
1093 raise KeyError(key)
1095 def values(self):
1096 self._init_if_needed()
1097 return super().values()
1099 def used_classes(self):
1100 class_map = {v.control_class.id: v.control_class for v in self.values() if isinstance(v, BaseControl)}
1101 return list(class_map.values())
1103 def with_class(self, control_class):
1104 if isinstance(control_class, str):
1105 control_class = ControlClass[control_class.upper()]
1106 elif isinstance(control_class, int):
1107 control_class = ControlClass(control_class)
1108 elif not isinstance(control_class, ControlClass):
1109 control_class = ControlClass(control_class.id - 1)
1110 for v in self.values():
1111 if not isinstance(v, BaseControl):
1112 continue
1113 if v.control_class.id - 1 == control_class:
1114 yield v
1116 def set_to_default(self):
1117 for v in self.values():
1118 if not isinstance(v, BaseControl):
1119 continue
1121 with contextlib.suppress(AttributeError):
1122 v.set_to_default()
1124 def set_clipping(self, clipping: bool) -> None:
1125 for v in self.values():
1126 if isinstance(v, BaseNumericControl):
1127 v.clipping = clipping
1130class BaseControl:
1131 def __init__(self, device, info, control_class):
1132 self.device = device
1133 self._info = info
1134 self.id = self._info.id
1135 self.name = self._info.name.decode()
1136 self._config_name = None
1137 self.control_class = control_class
1138 self.type = ControlType(self._info.type)
1139 self.flags = ControlFlag(self._info.flags)
1141 try:
1142 self.standard = ControlID(self.id)
1143 except ValueError:
1144 self.standard = None
1146 def __repr__(self):
1147 repr = str(self.config_name)
1149 addrepr = self._get_repr()
1150 addrepr = addrepr.strip()
1151 if addrepr:
1152 repr += f" {addrepr}"
1154 if self.flags:
1155 flags = [flag.name.lower() for flag in ControlFlag if ((self._info.flags & flag) == flag)]
1156 repr += " flags=" + ",".join(flags)
1158 return f"<{type(self).__name__} {repr}>"
1160 def _get_repr(self) -> str:
1161 return ""
1163 def _get_control(self):
1164 # value = get_controls_values(self.device, [self._info])[0]
1165 value = get_controls_values(self.device, (self._info,))[0]
1166 return value
1168 def _set_control(self, value):
1169 if not self.is_writeable:
1170 reasons = []
1171 if self.is_flagged_read_only:
1172 reasons.append("read-only")
1173 if self.is_flagged_inactive:
1174 reasons.append("inactive")
1175 if self.is_flagged_disabled:
1176 reasons.append("disabled")
1177 if self.is_flagged_grabbed:
1178 reasons.append("grabbed")
1179 raise AttributeError(f"{self.__class__.__name__} {self.config_name} is not writeable: {', '.join(reasons)}")
1180 set_controls_values(self.device, ((self._info, value),))
1182 @property
1183 def config_name(self) -> str:
1184 if self._config_name is None:
1185 res = self.name.lower()
1186 for r in ("(", ")"):
1187 res = res.replace(r, "")
1188 for r in (", ", " "):
1189 res = res.replace(r, "_")
1190 self._config_name = res
1191 return self._config_name
1193 @property
1194 def is_flagged_disabled(self) -> bool:
1195 return ControlFlag.DISABLED in self.flags
1197 @property
1198 def is_flagged_grabbed(self) -> bool:
1199 return ControlFlag.GRABBED in self.flags
1201 @property
1202 def is_flagged_read_only(self) -> bool:
1203 return ControlFlag.READ_ONLY in self.flags
1205 @property
1206 def is_flagged_update(self) -> bool:
1207 return ControlFlag.UPDATE in self.flags
1209 @property
1210 def is_flagged_inactive(self) -> bool:
1211 return ControlFlag.INACTIVE in self.flags
1213 @property
1214 def is_flagged_slider(self) -> bool:
1215 return ControlFlag.SLIDER in self.flags
1217 @property
1218 def is_flagged_write_only(self) -> bool:
1219 return ControlFlag.WRITE_ONLY in self.flags
1221 @property
1222 def is_flagged_volatile(self) -> bool:
1223 return ControlFlag.VOLATILE in self.flags
1225 @property
1226 def is_flagged_has_payload(self) -> bool:
1227 return ControlFlag.HAS_PAYLOAD in self.flags
1229 @property
1230 def is_flagged_execute_on_write(self) -> bool:
1231 return ControlFlag.EXECUTE_ON_WRITE in self.flags
1233 @property
1234 def is_flagged_modify_layout(self) -> bool:
1235 return ControlFlag.MODIFY_LAYOUT in self.flags
1237 @property
1238 def is_flagged_dynamic_array(self) -> bool:
1239 return ControlFlag.DYNAMIC_ARRAY in self.flags
1241 @property
1242 def is_writeable(self) -> bool:
1243 return not (self.is_flagged_read_only or self.is_flagged_disabled or self.is_flagged_grabbed)
1246class BaseMonoControl(BaseControl):
1247 def _get_repr(self) -> str:
1248 repr = f" default={self.default}"
1249 if not self.is_flagged_write_only:
1250 try:
1251 repr += f" value={self.value}"
1252 except Exception as error:
1253 repr += f" value=<error: {error!r}>"
1254 return repr
1256 def _convert_read(self, value):
1257 return value
1259 @property
1260 def default(self):
1261 default = get_controls_values(self.device, (self._info,), raw.ControlWhichValue.DEF_VAL)[0]
1262 return self._convert_read(default)
1264 @property
1265 def value(self):
1266 if not self.is_flagged_write_only:
1267 v = self._get_control()
1268 return self._convert_read(v)
1270 def _convert_write(self, value):
1271 return value
1273 def _mangle_write(self, value):
1274 return value
1276 @value.setter
1277 def value(self, value):
1278 v = self._convert_write(value)
1279 v = self._mangle_write(v)
1280 self._set_control(v)
1282 def set_to_default(self):
1283 self.value = self.default
1286class GenericControl(BaseMonoControl):
1287 pass
1290class BaseNumericControl(BaseMonoControl):
1291 lower_bound = -(2**31)
1292 upper_bound = 2**31 - 1
1294 def __init__(self, device, info, control_class, clipping=True):
1295 super().__init__(device, info, control_class)
1296 self.minimum = self._info.minimum
1297 self.maximum = self._info.maximum
1298 self.step = self._info.step
1299 self.clipping = clipping
1301 if self.minimum < self.lower_bound:
1302 raise RuntimeWarning(
1303 f"Control {self.config_name}'s claimed minimum value {self.minimum} exceeds lower bound of {self.__class__.__name__}"
1304 )
1305 if self.maximum > self.upper_bound:
1306 raise RuntimeWarning(
1307 f"Control {self.config_name}'s claimed maximum value {self.maximum} exceeds upper bound of {self.__class__.__name__}"
1308 )
1310 def _get_repr(self) -> str:
1311 repr = f" min={self.minimum} max={self.maximum} step={self.step}"
1312 repr += super()._get_repr()
1313 return repr
1315 def _convert_read(self, value):
1316 return value
1318 def _convert_write(self, value):
1319 if isinstance(value, int):
1320 return value
1321 else:
1322 try:
1323 v = int(value)
1324 except Exception:
1325 pass
1326 else:
1327 return v
1328 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to int")
1330 def _mangle_write(self, value):
1331 if self.clipping:
1332 if value < self.minimum:
1333 return self.minimum
1334 elif value > self.maximum:
1335 return self.maximum
1336 else:
1337 if value < self.minimum:
1338 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed minimum {self.minimum}")
1339 elif value > self.maximum:
1340 raise ValueError(f"Control {self.config_name}: {value} exceeds allowed maximum {self.maximum}")
1341 return value
1343 def increase(self, steps: int = 1):
1344 self.value += steps * self.step
1346 def decrease(self, steps: int = 1):
1347 self.value -= steps * self.step
1349 def set_to_minimum(self):
1350 self.value = self.minimum
1352 def set_to_maximum(self):
1353 self.value = self.maximum
1356class IntegerControl(BaseNumericControl):
1357 lower_bound = -(2**31)
1358 upper_bound = 2**31 - 1
1361class Integer64Control(BaseNumericControl):
1362 lower_bound = -(2**63)
1363 upper_bound = 2**63 - 1
1366class U8Control(BaseNumericControl):
1367 lower_bound = 0
1368 upper_bound = 2**8
1371class U16Control(BaseNumericControl):
1372 lower_bound = 0
1373 upper_bound = 2**16
1376class U32Control(BaseNumericControl):
1377 lower_bound = 0
1378 upper_bound = 2**32
1381class BooleanControl(BaseMonoControl):
1382 _true = ["true", "1", "yes", "on", "enable"]
1383 _false = ["false", "0", "no", "off", "disable"]
1385 def _convert_read(self, value):
1386 return bool(value)
1388 def _convert_write(self, value):
1389 if isinstance(value, bool):
1390 return value
1391 elif isinstance(value, str):
1392 if value in self._true:
1393 return True
1394 elif value in self._false:
1395 return False
1396 else:
1397 try:
1398 v = bool(value)
1399 except Exception:
1400 pass
1401 else:
1402 return v
1403 raise ValueError(f"Failed to coerce {value.__class__.__name__} '{value}' to bool")
1406class MenuControl(BaseMonoControl, UserDict):
1407 def __init__(self, device, info, control_class):
1408 BaseControl.__init__(self, device, info, control_class)
1409 UserDict.__init__(self)
1411 if self.type == ControlType.MENU:
1412 self.data = {item.index: item.name.decode() for item in iter_read_menu(self.device, self)}
1413 elif self.type == ControlType.INTEGER_MENU:
1414 self.data = {item.index: item.value for item in iter_read_menu(self.device, self)}
1415 else:
1416 raise TypeError(f"MenuControl only supports control types MENU or INTEGER_MENU, but not {self.type.name}")
1418 def _convert_write(self, value):
1419 return int(value)
1422class ButtonControl(BaseControl):
1423 def push(self):
1424 self._set_control(1)
1427class CompoundControl(BaseControl):
1428 @property
1429 def default(self):
1430 return get_controls_values(self.device, [self._info], raw.ControlWhichValue.DEF_VAL)[0]
1432 @property
1433 def value(self):
1434 if not self.is_flagged_write_only:
1435 return get_controls_values(self.device, [self._info])[0]
1437 @value.setter
1438 def value(self, value):
1439 set_controls_values(self.device, ((self._info, value),))
1442class DeviceHelper:
1443 def __init__(self, device: Device):
1444 super().__init__()
1445 self.device = device
1448class InfoEx(DeviceHelper):
1449 INFO_REPR = """\
1450driver = {info.driver}
1451card = {info.card}
1452bus = {info.bus_info}
1453version = {info.version}
1454capabilities = {capabilities}
1455device_capabilities = {device_capabilities}
1456buffers = {buffers}
1457"""
1459 def __init__(self, device: "Device"):
1460 self.device = device
1461 self._raw_capabilities_cache = None
1463 def __repr__(self):
1464 dcaps = "|".join(cap.name for cap in flag_items(self.device_capabilities))
1465 caps = "|".join(cap.name for cap in flag_items(self.capabilities))
1466 buffers = "|".join(buff.name for buff in self.buffers)
1467 return self.INFO_REPR.format(info=self, capabilities=caps, device_capabilities=dcaps, buffers=buffers)
1469 @property
1470 def raw_capabilities(self) -> raw.v4l2_capability:
1471 if self._raw_capabilities_cache is None:
1472 self._raw_capabilities_cache = read_capabilities(self.device)
1473 return self._raw_capabilities_cache
1475 @property
1476 def driver(self) -> str:
1477 return self.raw_capabilities.driver.decode()
1479 @property
1480 def card(self) -> str:
1481 return self.raw_capabilities.card.decode()
1483 @property
1484 def bus_info(self) -> str:
1485 return self.raw_capabilities.bus_info.decode()
1487 @property
1488 def version_tuple(self) -> tuple:
1489 caps = self.raw_capabilities
1490 return (
1491 (caps.version & 0xFF0000) >> 16,
1492 (caps.version & 0x00FF00) >> 8,
1493 (caps.version & 0x0000FF),
1494 )
1496 @property
1497 def version(self) -> str:
1498 return ".".join(map(str, self.version_tuple))
1500 @property
1501 def capabilities(self) -> Capability:
1502 return Capability(self.raw_capabilities.capabilities)
1504 @property
1505 def device_capabilities(self) -> Capability:
1506 return Capability(self.raw_capabilities.device_caps)
1508 @property
1509 def buffers(self):
1510 dev_caps = self.device_capabilities
1511 return [typ for typ in BufferType if Capability[typ.name] in dev_caps]
1513 def get_crop_capabilities(self, buffer_type: BufferType) -> CropCapability:
1514 return read_crop_capabilities(self.device, buffer_type)
1516 @property
1517 def crop_capabilities(self) -> dict[BufferType, CropCapability]:
1518 buffer_types = CROP_BUFFER_TYPES & set(self.buffers)
1519 result = {}
1520 for buffer_type in buffer_types:
1521 crop_cap = self.get_crop_capabilities(buffer_type)
1522 if crop_cap is None:
1523 continue
1524 result[buffer_type] = crop_cap
1525 return result
1527 @property
1528 def formats(self):
1529 img_fmt_buffer_types = IMAGE_FORMAT_BUFFER_TYPES & set(self.buffers)
1530 return [
1531 image_format
1532 for buffer_type in img_fmt_buffer_types
1533 for image_format in iter_read_formats(self.device, buffer_type)
1534 ]
1536 @property
1537 def frame_sizes(self):
1538 pixel_formats = {fmt.pixel_format for fmt in self.formats}
1539 return list(iter_read_pixel_formats_frame_intervals(self.device, pixel_formats))
1541 @property
1542 def inputs(self) -> list[Input]:
1543 return list(iter_read_inputs(self.device))
1545 @property
1546 def outputs(self):
1547 return list(iter_read_outputs(self.device))
1549 @property
1550 def controls(self):
1551 return list(iter_read_controls(self.device))
1553 @property
1554 def video_standards(self) -> list[Standard]:
1555 """List of video standards for the active input"""
1556 return list(iter_read_video_standards(self.device))
1559class BufferManager(DeviceHelper):
1560 def __init__(self, device: Device, buffer_type: BufferType, size: int = 2):
1561 super().__init__(device)
1562 self.type = buffer_type
1563 self.size = size
1564 self.buffers = None
1565 self.name = type(self).__name__
1567 def formats(self) -> list:
1568 formats = self.device.info.formats
1569 return [fmt for fmt in formats if fmt.type == self.type]
1571 def crop_capabilities(self):
1572 crop_capabilities = self.device.info.crop_capabilities
1573 return [crop for crop in crop_capabilities if crop.type == self.type]
1575 def query_buffer(self, memory, index):
1576 return self.device.query_buffer(self.type, memory, index)
1578 def enqueue_buffer(self, memory: Memory, size: int, index: int) -> raw.v4l2_buffer:
1579 return self.device.enqueue_buffer(self.type, memory, size, index)
1581 def dequeue_buffer(self, memory: Memory) -> raw.v4l2_buffer:
1582 return self.device.dequeue_buffer(self.type, memory)
1584 def enqueue_buffers(self, memory: Memory) -> list[raw.v4l2_buffer]:
1585 return self.device.enqueue_buffers(self.type, memory, self.size)
1587 def free_buffers(self, memory: Memory):
1588 result = self.device.free_buffers(self.type, memory)
1589 self.buffers = None
1590 return result
1592 def create_buffers(self, memory: Memory):
1593 if self.buffers:
1594 raise V4L2Error("buffers already requested. free first")
1595 self.buffers = self.device.create_buffers(self.type, memory, self.size)
1596 return self.buffers
1598 def request_buffers(self, memory: Memory):
1599 if self.buffers:
1600 raise V4L2Error("buffers already requested. free first")
1601 self.buffers = self.device.request_buffers(self.type, memory, self.size)
1602 return self.buffers
1604 def set_format(self, width, height, pixel_format="MJPG"):
1605 return self.device.set_format(self.type, width, height, pixel_format)
1607 def get_format(self) -> Format:
1608 return self.device.get_format(self.type)
1610 def set_fps(self, fps):
1611 return self.device.set_fps(self.type, fps)
1613 def get_fps(self):
1614 return self.device.get_fps(self.type)
1616 def set_selection(self, target, rectangle):
1617 return self.device.set_selection(self.type, target, rectangle)
1619 def get_selection(self, target):
1620 return self.device.get_selection(self.type, target)
1622 def stream_on(self):
1623 self.device.stream_on(self.type)
1625 def stream_off(self):
1626 self.device.stream_off(self.type)
1628 start = stream_on
1629 stop = stream_off
1632class Frame:
1633 """The resulting object from an acquisition."""
1635 __slots__ = ["format", "buff", "data"]
1637 def __init__(self, data: bytes, buff: raw.v4l2_buffer, format: Format):
1638 self.format = format
1639 self.buff = buff
1640 self.data = data
1642 def __bytes__(self):
1643 return self.data
1645 def __len__(self):
1646 return len(self.data)
1648 def __getitem__(self, index):
1649 return self.data[index]
1651 def __repr__(self) -> str:
1652 return (
1653 f"<{type(self).__name__} width={self.width}, height={self.height}, "
1654 f"format={self.pixel_format.name}, frame_nb={self.frame_nb}, timestamp={self.timestamp}>"
1655 )
1657 def __format__(self, spec):
1658 if spec in {"", "s"}:
1659 return str(self)
1660 elif spec == "r":
1661 return repr(self)
1662 elif spec == "f":
1663 return f"{self.width}x{self.height} {self.pixel_format.name}"
1664 elif spec == "l":
1665 return f"#{self.frame_nb} {self.timestamp} {self:f}"
1666 return str(getattr(self, spec))
1668 @property
1669 def width(self):
1670 return self.format.width
1672 @property
1673 def height(self):
1674 return self.format.height
1676 @property
1677 def nbytes(self):
1678 return self.buff.bytesused
1680 @property
1681 def pixel_format(self):
1682 return PixelFormat(self.format.pixel_format)
1684 @property
1685 def index(self):
1686 return self.buff.index
1688 @property
1689 def type(self):
1690 return BufferType(self.buff.type)
1692 @property
1693 def flags(self):
1694 return BufferFlag(self.buff.flags)
1696 @property
1697 def timestamp(self):
1698 return self.buff.timestamp.secs + self.buff.timestamp.usecs * 1e-6
1700 @property
1701 def frame_nb(self):
1702 return self.buff.sequence
1704 @property
1705 def memory(self):
1706 return Memory(self.buff.memory)
1708 @property
1709 def time_type(self):
1710 if BufferFlag.TIMECODE in self.flags:
1711 return TimeCodeType(self.buff.timecode.type)
1713 @property
1714 def time_flags(self):
1715 if BufferFlag.TIMECODE in self.flags:
1716 return TimeCodeFlag(self.buff.timecode.flags)
1718 @property
1719 def time_frame(self):
1720 if BufferFlag.TIMECODE in self.flags:
1721 return self.buff.timecode.frames
1723 @property
1724 def array(self):
1725 import numpy
1727 return numpy.frombuffer(self.data, dtype="u1")
1730class VideoCapture(BufferManager):
1731 def __init__(self, device: Device, size: int = 2, source: Capability = None):
1732 super().__init__(device, BufferType.VIDEO_CAPTURE, size)
1733 self.buffer = None
1734 self.source = source
1736 def __enter__(self):
1737 self.open()
1738 return self
1740 def __exit__(self, *exc):
1741 self.close()
1743 def __iter__(self):
1744 yield from self.buffer
1746 async def __aiter__(self):
1747 async for frame in self.buffer:
1748 yield frame
1750 def open(self):
1751 if self.buffer is None:
1752 self.device.log.info("Preparing for video capture...")
1753 capabilities = self.device.info.device_capabilities
1754 if Capability.VIDEO_CAPTURE not in capabilities:
1755 raise V4L2Error("device lacks VIDEO_CAPTURE capability")
1756 source = capabilities if self.source is None else self.source
1757 if Capability.STREAMING in source:
1758 self.device.log.info("Video capture using memory map")
1759 self.buffer = MemoryMap(self)
1760 # self.buffer = UserPtr(self)
1761 elif Capability.READWRITE in source:
1762 self.device.log.info("Video capture using read")
1763 self.buffer = ReadSource(self)
1764 else:
1765 raise OSError("Device needs to support STREAMING or READWRITE capability")
1766 self.buffer.open()
1767 self.stream_on()
1768 self.device.log.info("Video capture started!")
1770 def close(self):
1771 if self.buffer:
1772 self.device.log.info("Closing video capture...")
1773 self.stream_off()
1774 self.buffer.close()
1775 self.buffer = None
1776 self.device.log.info("Video capture closed")
1779class ReadSource(ReentrantOpen):
1780 def __init__(self, buffer_manager: BufferManager):
1781 super().__init__()
1782 self.buffer_manager = buffer_manager
1783 self.frame_reader = FrameReader(self.device, self.raw_read)
1784 self.format = None
1786 def __iter__(self) -> Iterator[Frame]:
1787 with self.frame_reader:
1788 while True:
1789 yield self.frame_reader.read()
1791 async def __aiter__(self) -> AsyncIterator[Frame]:
1792 async with self.frame_reader:
1793 while True:
1794 yield await self.frame_reader.aread()
1796 def open(self) -> None:
1797 self.format = self.buffer_manager.get_format()
1799 def close(self) -> None:
1800 self.format = None
1802 @property
1803 def device(self) -> Device:
1804 return self.buffer_manager.device
1806 def raw_grab(self) -> tuple[bytes, raw.v4l2_buffer]:
1807 data = os.read(self.device.fileno(), 2**31 - 1)
1808 ns = time.time_ns()
1809 buff = raw.v4l2_buffer()
1810 buff.bytesused = len(data)
1811 buff.timestamp.set_ns(ns)
1812 return data, buff
1814 def raw_read(self) -> Frame:
1815 data, buff = self.raw_grab()
1816 return Frame(data, buff, self.format)
1818 def wait_read(self) -> Frame:
1819 device = self.device
1820 if device.io.select is not None:
1821 device.io.select((device,), (), ())
1822 return self.raw_read()
1824 def read(self) -> Frame:
1825 # first time we check what mode device was opened (blocking vs non-blocking)
1826 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1827 # is available for read. So we need to do it here
1828 if self.device.is_blocking:
1829 self.read = self.raw_read
1830 else:
1831 self.read = self.wait_read
1832 return self.read()
1835class MemorySource(ReentrantOpen):
1836 def __init__(self, buffer_manager: BufferManager, source: Memory):
1837 super().__init__()
1838 self.buffer_manager = buffer_manager
1839 self.source = source
1840 self.buffers = None
1841 self.queue = BufferQueue(buffer_manager, source)
1842 self.frame_reader = FrameReader(self.device, self.raw_read)
1843 self.format = None
1845 def __iter__(self) -> Iterator[Frame]:
1846 with self.frame_reader:
1847 yield from self.frame_reader
1849 def __aiter__(self) -> AsyncIterator[Frame]:
1850 return astream(self.device.fileno(), self.raw_read)
1852 @property
1853 def device(self) -> Device:
1854 return self.buffer_manager.device
1856 def prepare_buffers(self):
1857 raise NotImplementedError
1859 def release_buffers(self):
1860 self.device.log.info("Freeing buffers...")
1861 for buf in self.buffers:
1862 buf.close()
1863 self.buffers = None
1864 self.buffer_manager.free_buffers(self.source)
1865 self.format = None
1866 self.device.log.info("Buffers freed")
1868 def open(self) -> None:
1869 self.format = self.buffer_manager.get_format()
1870 if self.buffers is None:
1871 self.prepare_buffers()
1873 def close(self) -> None:
1874 if self.buffers:
1875 self.release_buffers()
1877 def grab_from_buffer(self, buff: raw.v4l2_buffer):
1878 # return memoryview(self.buffers[buff.index])[: buff.bytesused], buff
1879 return self.buffers[buff.index][: buff.bytesused], buff
1881 def raw_grab(self) -> tuple[Buffer, raw.v4l2_buffer]:
1882 with self.queue as buff:
1883 return self.grab_from_buffer(buff)
1885 def raw_read(self) -> Frame:
1886 data, buff = self.raw_grab()
1887 return Frame(data, buff, self.format)
1889 def wait_read(self) -> Frame:
1890 device = self.device
1891 if device.io.select is not None:
1892 device.io.select((device,), (), ())
1893 return self.raw_read()
1895 def read(self) -> Frame:
1896 # first time we check what mode device was opened (blocking vs non-blocking)
1897 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1898 # is available for read. So we need to do it here
1899 if self.device.is_blocking:
1900 self.read = self.raw_read
1901 else:
1902 self.read = self.wait_read
1903 return self.read()
1905 def raw_write(self, data: Buffer) -> raw.v4l2_buffer:
1906 with self.queue as buff:
1907 size = getattr(data, "nbytes", len(data))
1908 memory = self.buffers[buff.index]
1909 memory[:size] = data
1910 buff.bytesused = size
1911 return buff
1913 def wait_write(self, data: Buffer) -> raw.v4l2_buffer:
1914 device = self.device
1915 if device.io.select is not None:
1916 _, r, _ = device.io.select((), (device,), ())
1917 return self.raw_write(data)
1919 def write(self, data: Buffer) -> raw.v4l2_buffer:
1920 # first time we check what mode device was opened (blocking vs non-blocking)
1921 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
1922 # is available for write. So we need to do it here
1923 if self.device.is_blocking:
1924 self.write = self.raw_write
1925 else:
1926 self.write = self.wait_write
1927 return self.write(data)
1930class UserPtr(MemorySource):
1931 def __init__(self, buffer_manager: BufferManager):
1932 super().__init__(buffer_manager, Memory.USERPTR)
1934 def prepare_buffers(self):
1935 self.device.log.info("Reserving buffers...")
1936 self.buffer_manager.create_buffers(self.source)
1937 size = self.format.size
1938 self.buffers = []
1939 for index in range(self.buffer_manager.size):
1940 data = ctypes.create_string_buffer(size)
1941 self.buffers.append(data)
1942 buff = raw.v4l2_buffer()
1943 buff.index = index
1944 buff.type = self.buffer_manager.type
1945 buff.memory = self.source
1946 buff.m.userptr = ctypes.addressof(data)
1947 buff.length = size
1948 self.queue.enqueue(buff)
1949 self.device.log.info("Buffers reserved")
1952class MemoryMap(MemorySource):
1953 def __init__(self, buffer_manager: BufferManager):
1954 super().__init__(buffer_manager, Memory.MMAP)
1956 def prepare_buffers(self):
1957 self.device.log.info("Reserving buffers...")
1958 buffers = self.buffer_manager.create_buffers(self.source)
1959 fd = self.device.fileno()
1960 self.buffers = [mmap_from_buffer(fd, buff) for buff in buffers]
1961 self.buffer_manager.enqueue_buffers(Memory.MMAP)
1962 self.format = self.buffer_manager.get_format()
1963 self.device.log.info("Buffers reserved")
1966class EventReader:
1967 def __init__(self, device: Device, max_queue_size=100):
1968 self.device = device
1969 self._loop = None
1970 self._selector = None
1971 self._buffer = None
1972 self._max_queue_size = max_queue_size
1974 async def __aenter__(self):
1975 if self.device.is_blocking:
1976 raise V4L2Error("Cannot use async event reader on blocking device")
1977 self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
1978 self._selector = select.epoll()
1979 self._loop = asyncio.get_event_loop()
1980 self._loop.add_reader(self._selector.fileno(), self._on_event)
1981 self._selector.register(self.device.fileno(), select.EPOLLPRI)
1982 return self
1984 async def __aexit__(self, exc_type, exc_value, traceback):
1985 self._selector.unregister(self.device.fileno())
1986 self._loop.remove_reader(self._selector.fileno())
1987 self._selector.close()
1988 self._selector = None
1989 self._loop = None
1990 self._buffer = None
1992 async def __aiter__(self):
1993 while True:
1994 yield await self.aread()
1996 def __iter__(self):
1997 while True:
1998 yield self.read()
2000 def __enter__(self):
2001 return self
2003 def __exit__(self, exc_type, exc_value, tb):
2004 pass
2006 def _on_event(self):
2007 task = self._loop.create_future()
2008 try:
2009 self._selector.poll(0) # avoid blocking
2010 event = self.device.deque_event()
2011 task.set_result(event)
2012 except Exception as error:
2013 task.set_exception(error)
2015 buffer = self._buffer
2016 if buffer.full():
2017 self.device.log.debug("missed event")
2018 buffer.popleft()
2019 buffer.put_nowait(task)
2021 def read(self, timeout=None):
2022 if not self.device.is_blocking:
2023 _, _, exc = self.device.io.select((), (), (self.device,), timeout)
2024 if not exc:
2025 return
2026 return self.device.deque_event()
2028 async def aread(self):
2029 """Wait for next event or return last event in queue"""
2030 task = await self._buffer.get()
2031 return await task
2034class FrameReader:
2035 def __init__(self, device: Device, raw_read: Callable[[], Buffer], max_queue_size: int = 1):
2036 self.device = device
2037 self.raw_read = raw_read
2038 self._loop = None
2039 self._selector = None
2040 self._buffer = None
2041 self._max_queue_size = max_queue_size
2042 self._device_fd = None
2044 async def __aenter__(self) -> Self:
2045 if self.device.is_blocking:
2046 raise V4L2Error("Cannot use async frame reader on blocking device")
2047 self._device_fd = self.device.fileno()
2048 self._buffer = asyncio.Queue(maxsize=self._max_queue_size)
2049 self._selector = select.epoll()
2050 self._loop = asyncio.get_event_loop()
2051 self._loop.add_reader(self._selector.fileno(), self._on_event)
2052 self._selector.register(self._device_fd, select.POLLIN)
2053 return self
2055 async def __aexit__(self, exc_type, exc_value, traceback):
2056 with contextlib.suppress(OSError):
2057 # device may have been closed by now
2058 self._selector.unregister(self._device_fd)
2059 self._loop.remove_reader(self._selector.fileno())
2060 self._selector.close()
2061 self._selector = None
2062 self._loop = None
2063 self._buffer = None
2065 def __enter__(self) -> Self:
2066 return self
2068 def __exit__(self, exc_type, exc_value, tb):
2069 pass
2071 def __iter__(self):
2072 while True:
2073 yield self.read()
2075 def _on_event(self) -> None:
2076 task = self._loop.create_future()
2077 try:
2078 self._selector.poll(0) # avoid blocking
2079 data = self.raw_read()
2080 task.set_result(data)
2081 except Exception as error:
2082 task.set_exception(error)
2084 buffer = self._buffer
2085 if buffer.full():
2086 self.device.log.warn("missed frame")
2087 buffer.get_nowait()
2088 buffer.put_nowait(task)
2090 def read(self, timeout: Optional[float] = None) -> Frame:
2091 if not self.device.is_blocking:
2092 read, _, _ = self.device.io.select((self.device,), (), (), timeout)
2093 if not read:
2094 return
2095 return self.raw_read()
2097 async def aread(self) -> Frame:
2098 """Wait for next frame or return last frame"""
2099 task = await self._buffer.get()
2100 return await task
2103class BufferQueue:
2104 def __init__(self, buffer_manager: BufferManager, memory: Memory):
2105 self.buffer_manager = buffer_manager
2106 self.memory = memory
2107 self.raw_buffer = None
2109 def enqueue(self, buff: raw.v4l2_buffer):
2110 enqueue_buffer_raw(self.buffer_manager.device.fileno(), buff)
2112 def dequeue(self):
2113 return self.buffer_manager.dequeue_buffer(self.memory)
2115 def __enter__(self) -> raw.v4l2_buffer:
2116 # get next buffer that has some data in it
2117 self.raw_buffer = self.dequeue()
2118 return self.raw_buffer
2120 def __exit__(self, *exc):
2121 # Make a copy of buffer. We need the original buffer that was sent to
2122 # dequeue in to keep frame info like frame number, timestamp, etc
2123 raw_buffer = raw.v4l2_buffer()
2124 memcpy(raw_buffer, self.raw_buffer)
2125 self.enqueue(raw_buffer)
2128class Write(ReentrantOpen):
2129 def __init__(self, buffer_manager: BufferManager):
2130 super().__init__()
2131 self.buffer_manager = buffer_manager
2133 @property
2134 def device(self) -> Device:
2135 return self.buffer_manager.device
2137 def raw_write(self, data: Buffer) -> None:
2138 self.device.write(data)
2140 def wait_write(self, data: Buffer) -> None:
2141 device = self.device
2142 if device.io.select is not None:
2143 _, w, _ = device.io.select((), (device,), ())
2144 if not w:
2145 raise OSError("Closed")
2146 self.raw_write(data)
2148 def write(self, data: Buffer) -> None:
2149 # first time we check what mode device was opened (blocking vs non-blocking)
2150 # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
2151 # is available for write. So we need to do it here
2152 if self.device.is_blocking:
2153 self.write = self.raw_write
2154 else:
2155 self.write = self.wait_write
2156 return self.write(data)
2158 def open(self) -> None:
2159 pass
2161 def close(self) -> None:
2162 pass
2165class VideoOutput(BufferManager):
2166 def __init__(self, device: Device, size: int = 2, sink: Capability = None):
2167 super().__init__(device, BufferType.VIDEO_OUTPUT, size)
2168 self.buffer = None
2169 self.sink = sink
2171 def __enter__(self) -> Self:
2172 self.open()
2173 return self
2175 def __exit__(self, *exc) -> None:
2176 self.close()
2178 def open(self) -> None:
2179 if self.buffer is not None:
2180 return
2181 self.device.log.info("Preparing for video output...")
2182 capabilities = self.device.info.capabilities
2183 # Don't check for output capability. Some drivers (ex: v4l2loopback) don't
2184 # report being output capable so that apps like zoom recognize them
2185 # if Capability.VIDEO_OUTPUT not in capabilities:
2186 # raise V4L2Error("device lacks VIDEO_OUTPUT capability")
2187 sink = capabilities if self.sink is None else self.sink
2188 if Capability.STREAMING in sink:
2189 self.device.log.info("Video output using memory map")
2190 self.buffer = MemoryMap(self)
2191 elif Capability.READWRITE in sink:
2192 self.device.log.info("Video output using write")
2193 self.buffer = Write(self)
2194 else:
2195 raise OSError("Device needs to support STREAMING or READWRITE capability")
2196 self.buffer.open()
2197 self.stream_on()
2198 self.device.log.info("Video output started!")
2200 def close(self) -> None:
2201 if self.buffer:
2202 self.device.log.info("Closing video output...")
2203 try:
2204 self.stream_off()
2205 except Exception as error:
2206 self.device.log.warning("Failed to close stream: %r", error)
2207 try:
2208 self.buffer.close()
2209 except Exception as error:
2210 self.device.log.warning("Failed to close buffer: %r", error)
2211 self.buffer = None
2212 self.device.log.info("Video output closed")
2214 def write(self, data: Buffer) -> None:
2215 self.buffer.write(data)
2218def iter_video_files(path: PathLike = "/dev") -> Iterable[Path]:
2219 """Returns an iterator over all video files"""
2220 return iter_device_files(path=path, pattern="video*")
2223def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2224 """Returns an iterator over all video devices"""
2225 return (Device(name, **kwargs) for name in iter_video_files(path=path))
2228def iter_video_capture_files(path: PathLike = "/dev") -> Iterable[Path]:
2229 """Returns an iterator over all video files that have CAPTURE capability"""
2231 def filt(filename):
2232 with IO.open(filename) as fobj:
2233 caps = read_capabilities(fobj.fileno())
2234 return Capability.VIDEO_CAPTURE in Capability(caps.device_caps)
2236 return filter(filt, iter_video_files(path))
2239def iter_video_capture_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2240 """Returns an iterator over all video devices that have CAPTURE capability"""
2241 return (Device(name, **kwargs) for name in iter_video_capture_files(path))
2244def iter_video_output_files(path: PathLike = "/dev") -> Iterable[Path]:
2245 """
2246 Some drivers (ex: v4l2loopback) don't report being output capable so that
2247 apps like zoom recognize them as valid capture devices so some results might
2248 be missing
2249 """
2251 def filt(filename):
2252 with IO.open(filename) as fobj:
2253 caps = read_capabilities(fobj.fileno())
2254 return Capability.VIDEO_OUTPUT in Capability(caps.device_caps)
2256 return filter(filt, iter_video_files(path))
2259def iter_video_output_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
2260 """Returns an iterator over all video devices that have VIDEO OUTPUT capability"""
2261 return (Device(name, **kwargs) for name in iter_video_output_files(path))
2264find = make_find(iter_devices)