Coverage for linuxpy/device.py: 94%
97 statements
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7import contextlib
8import logging
9import os
10import pathlib
11from io import IOBase
13from linuxpy.types import Iterable, Optional, PathLike
15from .io import IO
17DEV_PATH = pathlib.Path("/dev")
20log = logging.getLogger(__name__)
23class ReentrantOpen(contextlib.AbstractContextManager):
24 """Base for a reusable, reentrant (but not thread safe) open/close context manager"""
26 def __init__(self):
27 self._context_level = 0
29 def __enter__(self):
30 if not self._context_level:
31 self.open()
32 self._context_level += 1
33 return self
35 def __exit__(self, *exc):
36 self._context_level -= 1
37 if not self._context_level:
38 self.close()
40 def open(self):
41 """Mandatory override for concrete sub-class"""
42 raise NotImplementedError
44 def close(self):
45 """Mandatory override for concrete sub-class"""
46 raise NotImplementedError
49def device_number(path: PathLike) -> Optional[int]:
50 """Retrieves device number from a path like. Example: gives 12 for /dev/video12"""
51 num = ""
52 for c in str(path)[::-1]:
53 if c.isdigit():
54 num = c + num
55 else:
56 break
57 return int(num) if num else None
60def is_device_file(path: PathLike, read_write: bool = True):
61 """Check if path like is a readable (and, optionally, writable) character device."""
62 path = pathlib.Path(path)
63 if not path.is_char_device():
64 return False
65 access = os.R_OK | (os.W_OK if read_write else 0)
66 if not os.access(str(path), access):
67 return False
68 return True
71def iter_device_files(path: PathLike = "/dev", pattern: str = "*") -> Iterable[pathlib.Path]:
72 """Iterable of accessible (read & write) char device files under the given path"""
73 path = pathlib.Path(path)
74 items = path.glob(pattern)
76 return filter(is_device_file, items)
79class BaseDevice(ReentrantOpen):
80 """Base class for a reentrant device"""
82 PREFIX = None
84 def __init__(self, name_or_file, read_write=True, io=IO, blocking=False):
85 super().__init__()
86 self.io = io
87 self.blocking = blocking
88 if isinstance(name_or_file, str | pathlib.Path):
89 filename = pathlib.Path(name_or_file)
90 self._read_write = read_write
91 self._fobj = None
92 elif isinstance(name_or_file, IOBase):
93 filename = pathlib.Path(name_or_file.name)
94 self._read_write = "+" in name_or_file.mode
95 self._fobj = name_or_file
96 # this object context manager won't close the file anymore
97 self._context_level += 1
98 self._on_open()
99 else:
100 raise TypeError(
101 f"name_or_file must be a Path, str or a file-like object, not {name_or_file.__class__.__name__}"
102 )
103 self.log = log.getChild(filename.stem)
104 self.filename = filename
105 self.index = device_number(filename)
107 def __repr__(self):
108 return f"<{type(self).__name__} name={self.filename}, closed={self.closed}>"
110 def _on_open(self):
111 pass
113 def _on_close(self):
114 pass
116 @classmethod
117 def from_id(cls, did: int, **kwargs):
118 """Create a new Device from the given id"""
119 return cls(f"{cls.PREFIX}{did}", **kwargs)
121 def open(self):
122 """Open the device if not already open. Triggers _on_open after the underlying OS open has succeeded"""
123 if not self._fobj:
124 self.log.info("opening %s", self.filename)
125 self._fobj = self.io.open(self.filename, self._read_write, blocking=self.blocking)
126 self._on_open()
127 self.log.info("opened %s", self.filename)
129 def close(self):
130 """Closes the device if not already closed. Triggers _on_close before the underlying OS close has succeeded"""
131 if not self.closed:
132 self._on_close()
133 self.log.info("closing %s", self.filename)
134 self._fobj.close()
135 self._fobj = None
136 self.log.info("closed %s", self.filename)
138 def fileno(self) -> int:
139 """Return the underlying file descriptor (an integer) of the stream if it exists"""
140 return self._fobj.fileno()
142 @property
143 def closed(self) -> bool:
144 """True if the stream is closed. Always succeeds"""
145 return self._fobj is None or self._fobj.closed
147 @property
148 def is_blocking(self) -> bool:
149 """
150 True if the underlying OS is opened in blocking mode.
151 Raises error if device is not opened.
152 """
153 return self.io.os.get_blocking(self.fileno())
155 def set_blocking(self, yes_no: bool) -> None:
156 """
157 Turns on/off blocking mode.
158 Raises error if device is not opened.
159 """
160 self.io.os.set_blocking(self.fileno(), yes_no)
161 self.blocking = yes_no