Coverage for linuxpy/device.py: 94%
93 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:58 +0200
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:58 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7import contextlib
8import logging
9import os
10import pathlib
11from io import IOBase
13from linuxpy.types import Iterable, Optional, PathLike
15from .io import IO
17DEV_PATH = pathlib.Path("/dev")
20log = logging.getLogger(__name__)
23class ReentrantOpen(contextlib.AbstractContextManager):
24 """Base for a reusable, reentrant (but not thread safe) open/close context manager"""
26 def __init__(self):
27 self._context_level = 0
29 def __enter__(self):
30 if not self._context_level:
31 self.open()
32 self._context_level += 1
33 return self
35 def __exit__(self, *exc):
36 self._context_level -= 1
37 if not self._context_level:
38 self.close()
40 def open(self):
41 """Mandatory override for concrete sub-class"""
42 raise NotImplementedError
44 def close(self):
45 """Mandatory override for concrete sub-class"""
46 raise NotImplementedError
49def device_number(path: PathLike) -> Optional[int]:
50 """Retrieves device number from a path like. Example: gives 12 for /dev/video12"""
51 num = ""
52 for c in str(path)[::-1]: 52 ↛ 57line 52 didn't jump to line 57 because the loop on line 52 didn't complete
53 if c.isdigit():
54 num = c + num
55 else:
56 break
57 return int(num) if num else None
60def is_device_file(path: PathLike, read_write: bool = True):
61 """Check if path like is a readable (and, optionally, writable) character device."""
62 path = pathlib.Path(path)
63 if not path.is_char_device(): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 return False
65 access = os.R_OK | (os.W_OK if read_write else 0)
66 if not os.access(str(path), access): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 return False
68 return True
71def iter_device_files(path: PathLike = "/dev", pattern: str = "*") -> Iterable[pathlib.Path]:
72 """Iterable of accessible (read & write) char device files under the given path"""
73 path = pathlib.Path(path)
74 items = path.glob(pattern)
76 return filter(is_device_file, items)
79class BaseDevice(ReentrantOpen):
80 """Base class for a reentrant device"""
82 PREFIX = None
84 def __init__(self, name_or_file, read_write=True, io=IO):
85 super().__init__()
86 self.io = io
87 if isinstance(name_or_file, (str, pathlib.Path)):
88 filename = pathlib.Path(name_or_file)
89 self._read_write = read_write
90 self._fobj = None
91 elif isinstance(name_or_file, IOBase):
92 filename = pathlib.Path(name_or_file.name)
93 self._read_write = "+" in name_or_file.mode
94 self._fobj = name_or_file
95 # this object context manager won't close the file anymore
96 self._context_level += 1
97 self._on_open()
98 else:
99 raise TypeError(
100 f"name_or_file must be a Path, str or a file-like object, not {name_or_file.__class__.__name__}"
101 )
102 self.log = log.getChild(filename.stem)
103 self.filename = filename
104 self.index = device_number(filename)
106 def __repr__(self):
107 return f"<{type(self).__name__} name={self.filename}, closed={self.closed}>"
109 def _on_open(self):
110 pass
112 def _on_close(self):
113 pass
115 @classmethod
116 def from_id(cls, did: int, **kwargs):
117 """Create a new Device from the given id"""
118 return cls(f"{cls.PREFIX}{did}", **kwargs)
120 def open(self):
121 """Open the device if not already open. Triggers _on_open after the underlying OS open has succeeded"""
122 if not self._fobj:
123 self.log.info("opening %s", self.filename)
124 self._fobj = self.io.open(self.filename, self._read_write)
125 self._on_open()
126 self.log.info("opened %s", self.filename)
128 def close(self):
129 """Closes the device if not already closed. Triggers _on_close before the underlying OS open has succeeded"""
130 if not self.closed:
131 self._on_close()
132 self.log.info("closing %s", self.filename)
133 self._fobj.close()
134 self._fobj = None
135 self.log.info("closed %s", self.filename)
137 def fileno(self) -> int:
138 """Return the underlying file descriptor (an integer) of the stream if it exists"""
139 return self._fobj.fileno()
141 @property
142 def closed(self) -> bool:
143 """True if the stream is closed. Always succeeds"""
144 return self._fobj is None or self._fobj.closed
146 @property
147 def is_blocking(self) -> bool:
148 """
149 True if the underlying OS is opened in blocking mode.
150 Raises error if device is not opened.
151 """
152 return os.get_blocking(self.fileno())