Coverage for linuxpy/device.py: 94%

93 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-21 07:58 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7import contextlib 

8import logging 

9import os 

10import pathlib 

11from io import IOBase 

12 

13from linuxpy.types import Iterable, Optional, PathLike 

14 

15from .io import IO 

16 

17DEV_PATH = pathlib.Path("/dev") 

18 

19 

20log = logging.getLogger(__name__) 

21 

22 

23class ReentrantOpen(contextlib.AbstractContextManager): 

24 """Base for a reusable, reentrant (but not thread safe) open/close context manager""" 

25 

26 def __init__(self): 

27 self._context_level = 0 

28 

29 def __enter__(self): 

30 if not self._context_level: 

31 self.open() 

32 self._context_level += 1 

33 return self 

34 

35 def __exit__(self, *exc): 

36 self._context_level -= 1 

37 if not self._context_level: 

38 self.close() 

39 

40 def open(self): 

41 """Mandatory override for concrete sub-class""" 

42 raise NotImplementedError 

43 

44 def close(self): 

45 """Mandatory override for concrete sub-class""" 

46 raise NotImplementedError 

47 

48 

49def device_number(path: PathLike) -> Optional[int]: 

50 """Retrieves device number from a path like. Example: gives 12 for /dev/video12""" 

51 num = "" 

52 for c in str(path)[::-1]: 52 ↛ 57line 52 didn't jump to line 57 because the loop on line 52 didn't complete

53 if c.isdigit(): 

54 num = c + num 

55 else: 

56 break 

57 return int(num) if num else None 

58 

59 

60def is_device_file(path: PathLike, read_write: bool = True): 

61 """Check if path like is a readable (and, optionally, writable) character device.""" 

62 path = pathlib.Path(path) 

63 if not path.is_char_device(): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 return False 

65 access = os.R_OK | (os.W_OK if read_write else 0) 

66 if not os.access(str(path), access): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 return False 

68 return True 

69 

70 

71def iter_device_files(path: PathLike = "/dev", pattern: str = "*") -> Iterable[pathlib.Path]: 

72 """Iterable of accessible (read & write) char device files under the given path""" 

73 path = pathlib.Path(path) 

74 items = path.glob(pattern) 

75 

76 return filter(is_device_file, items) 

77 

78 

79class BaseDevice(ReentrantOpen): 

80 """Base class for a reentrant device""" 

81 

82 PREFIX = None 

83 

84 def __init__(self, name_or_file, read_write=True, io=IO): 

85 super().__init__() 

86 self.io = io 

87 if isinstance(name_or_file, (str, pathlib.Path)): 

88 filename = pathlib.Path(name_or_file) 

89 self._read_write = read_write 

90 self._fobj = None 

91 elif isinstance(name_or_file, IOBase): 

92 filename = pathlib.Path(name_or_file.name) 

93 self._read_write = "+" in name_or_file.mode 

94 self._fobj = name_or_file 

95 # this object context manager won't close the file anymore 

96 self._context_level += 1 

97 self._on_open() 

98 else: 

99 raise TypeError( 

100 f"name_or_file must be a Path, str or a file-like object, not {name_or_file.__class__.__name__}" 

101 ) 

102 self.log = log.getChild(filename.stem) 

103 self.filename = filename 

104 self.index = device_number(filename) 

105 

106 def __repr__(self): 

107 return f"<{type(self).__name__} name={self.filename}, closed={self.closed}>" 

108 

109 def _on_open(self): 

110 pass 

111 

112 def _on_close(self): 

113 pass 

114 

115 @classmethod 

116 def from_id(cls, did: int, **kwargs): 

117 """Create a new Device from the given id""" 

118 return cls(f"{cls.PREFIX}{did}", **kwargs) 

119 

120 def open(self): 

121 """Open the device if not already open. Triggers _on_open after the underlying OS open has succeeded""" 

122 if not self._fobj: 

123 self.log.info("opening %s", self.filename) 

124 self._fobj = self.io.open(self.filename, self._read_write) 

125 self._on_open() 

126 self.log.info("opened %s", self.filename) 

127 

128 def close(self): 

129 """Closes the device if not already closed. Triggers _on_close before the underlying OS open has succeeded""" 

130 if not self.closed: 

131 self._on_close() 

132 self.log.info("closing %s", self.filename) 

133 self._fobj.close() 

134 self._fobj = None 

135 self.log.info("closed %s", self.filename) 

136 

137 def fileno(self) -> int: 

138 """Return the underlying file descriptor (an integer) of the stream if it exists""" 

139 return self._fobj.fileno() 

140 

141 @property 

142 def closed(self) -> bool: 

143 """True if the stream is closed. Always succeeds""" 

144 return self._fobj is None or self._fobj.closed 

145 

146 @property 

147 def is_blocking(self) -> bool: 

148 """ 

149 True if the underlying OS is opened in blocking mode. 

150 Raises error if device is not opened. 

151 """ 

152 return os.get_blocking(self.fileno())