Coverage for linuxpy/device.py: 94%

97 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2026-02-19 15:11 +0100

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7import contextlib 

8import logging 

9import os 

10import pathlib 

11from io import IOBase 

12 

13from linuxpy.types import Iterable, Optional, PathLike 

14 

15from .io import IO 

16 

17DEV_PATH = pathlib.Path("/dev") 

18 

19 

20log = logging.getLogger(__name__) 

21 

22 

23class ReentrantOpen(contextlib.AbstractContextManager): 

24 """Base for a reusable, reentrant (but not thread safe) open/close context manager""" 

25 

26 def __init__(self): 

27 self._context_level = 0 

28 

29 def __enter__(self): 

30 if not self._context_level: 

31 self.open() 

32 self._context_level += 1 

33 return self 

34 

35 def __exit__(self, *exc): 

36 self._context_level -= 1 

37 if not self._context_level: 

38 self.close() 

39 

40 def open(self): 

41 """Mandatory override for concrete sub-class""" 

42 raise NotImplementedError 

43 

44 def close(self): 

45 """Mandatory override for concrete sub-class""" 

46 raise NotImplementedError 

47 

48 

49def device_number(path: PathLike) -> Optional[int]: 

50 """Retrieves device number from a path like. Example: gives 12 for /dev/video12""" 

51 num = "" 

52 for c in str(path)[::-1]: 

53 if c.isdigit(): 

54 num = c + num 

55 else: 

56 break 

57 return int(num) if num else None 

58 

59 

60def is_device_file(path: PathLike, read_write: bool = True): 

61 """Check if path like is a readable (and, optionally, writable) character device.""" 

62 path = pathlib.Path(path) 

63 if not path.is_char_device(): 

64 return False 

65 access = os.R_OK | (os.W_OK if read_write else 0) 

66 if not os.access(str(path), access): 

67 return False 

68 return True 

69 

70 

71def iter_device_files(path: PathLike = "/dev", pattern: str = "*") -> Iterable[pathlib.Path]: 

72 """Iterable of accessible (read & write) char device files under the given path""" 

73 path = pathlib.Path(path) 

74 items = path.glob(pattern) 

75 

76 return filter(is_device_file, items) 

77 

78 

79class BaseDevice(ReentrantOpen): 

80 """Base class for a reentrant device""" 

81 

82 PREFIX = None 

83 

84 def __init__(self, name_or_file, read_write=True, io=IO, blocking=False): 

85 super().__init__() 

86 self.io = io 

87 self.blocking = blocking 

88 if isinstance(name_or_file, str | pathlib.Path): 

89 filename = pathlib.Path(name_or_file) 

90 self._read_write = read_write 

91 self._fobj = None 

92 elif isinstance(name_or_file, IOBase): 

93 filename = pathlib.Path(name_or_file.name) 

94 self._read_write = "+" in name_or_file.mode 

95 self._fobj = name_or_file 

96 # this object context manager won't close the file anymore 

97 self._context_level += 1 

98 self._on_open() 

99 else: 

100 raise TypeError( 

101 f"name_or_file must be a Path, str or a file-like object, not {name_or_file.__class__.__name__}" 

102 ) 

103 self.log = log.getChild(filename.stem) 

104 self.filename = filename 

105 self.index = device_number(filename) 

106 

107 def __repr__(self): 

108 return f"<{type(self).__name__} name={self.filename}, closed={self.closed}>" 

109 

110 def _on_open(self): 

111 pass 

112 

113 def _on_close(self): 

114 pass 

115 

116 @classmethod 

117 def from_id(cls, did: int, **kwargs): 

118 """Create a new Device from the given id""" 

119 return cls(f"{cls.PREFIX}{did}", **kwargs) 

120 

121 def open(self): 

122 """Open the device if not already open. Triggers _on_open after the underlying OS open has succeeded""" 

123 if not self._fobj: 

124 self.log.info("opening %s", self.filename) 

125 self._fobj = self.io.open(self.filename, self._read_write, blocking=self.blocking) 

126 self._on_open() 

127 self.log.info("opened %s", self.filename) 

128 

129 def close(self): 

130 """Closes the device if not already closed. Triggers _on_close before the underlying OS close has succeeded""" 

131 if not self.closed: 

132 self._on_close() 

133 self.log.info("closing %s", self.filename) 

134 self._fobj.close() 

135 self._fobj = None 

136 self.log.info("closed %s", self.filename) 

137 

138 def fileno(self) -> int: 

139 """Return the underlying file descriptor (an integer) of the stream if it exists""" 

140 return self._fobj.fileno() 

141 

142 @property 

143 def closed(self) -> bool: 

144 """True if the stream is closed. Always succeeds""" 

145 return self._fobj is None or self._fobj.closed 

146 

147 @property 

148 def is_blocking(self) -> bool: 

149 """ 

150 True if the underlying OS is opened in blocking mode. 

151 Raises error if device is not opened. 

152 """ 

153 return self.io.os.get_blocking(self.fileno()) 

154 

155 def set_blocking(self, yes_no: bool) -> None: 

156 """ 

157 Turns on/off blocking mode. 

158 Raises error if device is not opened. 

159 """ 

160 self.io.os.set_blocking(self.fileno(), yes_no) 

161 self.blocking = yes_no