Coverage for linuxpy/util.py: 98%

164 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2025-05-27 13:54 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7"""Utility functions used by the library. Mostly for internal usage""" 

8 

9import asyncio 

10import contextlib 

11import functools 

12import operator 

13import random 

14import selectors 

15import string 

16 

17from .types import ( 

18 AsyncIterator, 

19 Callable, 

20 Collection, 

21 FDLike, 

22 Iterable, 

23 Iterator, 

24 Optional, 

25 Sequence, 

26 T, 

27 Union, 

28) 

29 

30#: used to distinguish passing or not a kwargs 

31sentinel = object() 

32 

33 

34def iter_chunks(lst: Sequence, size: int) -> Iterable: 

35 """ 

36 Batch data from the sequence into groups of length n. 

37 The last batch may be shorter than size. 

38 """ 

39 return (lst[i : i + size] for i in range(0, len(lst), size)) 

40 

41 

42def chunks(lst: Sequence, size: int) -> tuple: 

43 """ 

44 Batch data from the sequence into groups of length n. 

45 The last batch may be shorter than size. 

46 """ 

47 return tuple(iter_chunks(lst, size)) 

48 

49 

50def bcd_version_tuple(bcd: int) -> tuple[int, ...]: 

51 """Returns the tuple version of a BCD (binary-coded decimal) number""" 

52 text = f"{bcd:x}" 

53 text_size = len(text) 

54 if text_size % 2: 

55 text = "0" + text 

56 return tuple(int(i) for i in iter_chunks(text, 2)) 

57 

58 

59def bcd_version(bcd: int) -> str: 

60 """Returns the text version of a BCD (binary-coded decimal) number""" 

61 return ".".join(str(i) for i in bcd_version_tuple(bcd)) 

62 

63 

64def to_fd(fd: FDLike): 

65 """Return a file descriptor from a file object. 

66 

67 Parameters: 

68 fd -- file object or file descriptor 

69 

70 Returns: 

71 corresponding file descriptor 

72 

73 Raises: 

74 ValueError if the object is invalid 

75 """ 

76 if not isinstance(fd, int): 

77 try: 

78 fd = int(fd.fileno()) 

79 except (AttributeError, TypeError, ValueError): 

80 raise ValueError(f"Invalid file object: {fd!r}") from None 

81 if fd < 0: 

82 raise ValueError(f"Invalid file descriptor: {fd}") 

83 return fd 

84 

85 

86int16 = functools.partial(int, base=16) 

87 

88 

89def try_numeric(text: str, order=(int, int16, float)): 

90 """ 

91 Try to translate given text into int, int base 16 or float. 

92 Returns the orig and return the original text if it fails. 

93 

94 Args: 

95 text (str): text to be translated 

96 

97 Returns: 

98 int, float or str: The converted text 

99 """ 

100 for func in order: 

101 try: 

102 return func(text) 

103 except ValueError: 

104 pass 

105 return text 

106 

107 

108@contextlib.contextmanager 

109def add_reader_asyncio(fd: FDLike, callback: Callable, *args, loop: Optional[asyncio.AbstractEventLoop] = None): 

110 """Add reader during the context and remove it after""" 

111 fd = to_fd(fd) 

112 if loop is None: 

113 loop = asyncio.get_event_loop() 

114 loop.add_reader(fd, callback, *args) 

115 try: 

116 yield loop 

117 finally: 

118 loop.remove_reader(fd) 

119 

120 

121async def astream(fd: FDLike, read_func: Callable, max_buffer_size=10) -> AsyncIterator: 

122 queue = asyncio.Queue(maxsize=max_buffer_size) 

123 

124 def feed(): 

125 queue.put_nowait(read_func()) 

126 

127 with add_reader_asyncio(fd, feed): 

128 while True: 

129 event = await queue.get() 

130 yield event 

131 

132 

133def Selector(fds: Collection[FDLike], events=selectors.EVENT_READ) -> selectors.DefaultSelector: 

134 """A selectors.DefaultSelector with given fds registered""" 

135 selector = selectors.DefaultSelector() 

136 for fd in fds: 

137 selector.register(fd, events) 

138 return selector 

139 

140 

141SelectorEventMask = int 

142SelectorEvent = tuple[selectors.SelectorKey, SelectorEventMask] 

143 

144 

145def selector_stream(selector: selectors.BaseSelector, timeout: Optional[float] = None) -> Iterable[SelectorEvent]: 

146 """A stream of selector read events""" 

147 while True: 

148 yield from selector.select(timeout) 

149 

150 

151def selector_file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[SelectorEvent]: 

152 """An inifinte stream of selector read events""" 

153 yield from selector_stream(Selector(fds), timeout) 

154 

155 

156def file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[FDLike]: 

157 """An infinite stream of read ready file descriptors""" 

158 yield from (key.fileobj for key, _ in selector_file_stream(fds, timeout)) 

159 

160 

161def event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T], timeout: Optional[float] = None) -> Iterable[T]: 

162 """An infinite stream of events. The given read callable is called for each file 

163 that is reported as ready""" 

164 yield from (read(fd) for fd in file_stream(fds)) 

165 

166 

167async def async_selector_stream(selector: selectors.BaseSelector) -> AsyncIterator[SelectorEvent]: 

168 """An asyncronous infinite stream of selector read events""" 

169 async with contextlib.aclosing(astream(selector, selector.select)) as stream: 

170 async for events in stream: 

171 for event in events: 

172 yield event 

173 

174 

175async def async_selector_file_stream(fds: Collection[FDLike]) -> AsyncIterator[SelectorEvent]: 

176 """An asyncronous infinite stream of selector read events""" 

177 selector = Selector(fds) 

178 async with contextlib.aclosing(async_selector_stream(selector)) as stream: 

179 async for event in stream: 

180 yield event 

181 

182 

183async def async_file_stream(fds: Collection[FDLike]) -> AsyncIterator[FDLike]: 

184 """An asyncronous infinite stream of read ready files""" 

185 async with contextlib.aclosing(async_selector_file_stream(fds)) as stream: 

186 async for key, _ in stream: 

187 yield key.fileobj 

188 

189 

190async def async_event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T]): 

191 """An asyncronous stream of events. The given read callable is called for each file 

192 that is reported as ready""" 

193 async with contextlib.aclosing(async_file_stream(fds)) as stream: 

194 async for fd in stream: 

195 yield read(fd) 

196 

197 

198def make_find(iter_devices: Callable[[], Iterator], needs_open=True) -> Callable: 

199 """ 

200 Create a find function for the given callable. The callable should 

201 return an iterable where each element has the context manager capability 

202 (ie, it can be used in a with statement) 

203 """ 

204 

205 def find(find_all=False, custom_match=None, **kwargs): 

206 idevs = iter_devices() 

207 if kwargs or custom_match: 

208 

209 def simple_accept(dev): 

210 result = all(getattr(dev, key) == value for key, value in kwargs.items()) 

211 if result and custom_match: 

212 return custom_match(dev) 

213 return result 

214 

215 if needs_open: 

216 

217 def accept(dev): 

218 with dev: 

219 return simple_accept(dev) 

220 else: 

221 accept = simple_accept 

222 

223 idevs = filter(accept, idevs) 

224 return idevs if find_all else next(idevs, None) 

225 

226 return find 

227 

228 

229class Version: 

230 """Simple version supporting only a.b.c format""" 

231 

232 def __init__(self, major: int, minor: int, patch: int): 

233 self.major = major 

234 self.minor = minor 

235 self.patch = patch 

236 

237 @classmethod 

238 def from_tuple(cls, sequence: Iterable[Union[str, int]]): 

239 """ 

240 Create a Version from any sequence/iterable of 3 elements 

241 

242 Examples that create a Version "3.2.1" object: 

243 ```python 

244 Version.from_tuple([3, 2, 1]) 

245 Version.from_tuple(["3", 2, "1"]) 

246 Version.from_tuple(range(3, 0, -1)) 

247 

248 # This will work although not recommended since it will not work 

249 # when any of the members is bigger than 9 

250 Version.from_tuple("321") 

251 ``` 

252 """ 

253 return cls(*map(int, sequence)) 

254 

255 @classmethod 

256 def from_str(cls, text): 

257 """ 

258 Create a Version from text 

259 

260 Example that create a Version "3.2.1" object: 

261 ```python 

262 Version.from_str("3.2.1") 

263 ``` 

264 """ 

265 return cls.from_tuple(text.split(".", 2)) 

266 

267 @classmethod 

268 def from_number(cls, number: int): 

269 """ 

270 Create a Version from an integer where each member corresponds to 

271 one byte in the integer so that 0xFFFEFD corresponds to 255.254.253 

272 

273 Example that create a Version "3.2.1" object: 

274 ```python 

275 Version.from_number((3<<16) + (2<<8) + 1) 

276 ``` 

277 """ 

278 return cls((number >> 16) & 0xFF, (number >> 8) & 0xFF, number & 0xFF) 

279 

280 def __int__(self): 

281 return (self.major << 16) + (self.minor << 8) + self.patch 

282 

283 def __repr__(self): 

284 return f"{self.major}.{self.minor}.{self.patch}" 

285 

286 def __format__(self, fmt): 

287 return format(repr(self), fmt) 

288 

289 def __getitem__(self, item): 

290 return self.tuple[item] 

291 

292 def __eq__(self, other): 

293 return self.tuple == self._try_convert(other).tuple 

294 

295 def __lt__(self, other): 

296 return self.tuple < self._try_convert(other).tuple 

297 

298 def __le__(self, other): 

299 return self == other or self < other 

300 

301 def __gt__(self, other): 

302 return not self <= other 

303 

304 def __ge__(self, other): 

305 return not self < other 

306 

307 @classmethod 

308 def _try_convert(cls, value): 

309 if isinstance(value, cls): 

310 return value 

311 elif isinstance(value, int): 

312 return cls.from_number(value) 

313 elif isinstance(value, str): 

314 return cls.from_str(value) 

315 elif isinstance(value, (tuple, list)): 

316 return cls.from_tuple(value) 

317 raise ValueError("Comparison with non-Version object") 

318 

319 @property 

320 def tuple(self): 

321 """ 

322 Returns a 3 element tuple representing the version 

323 so `Version(3,2,1).tuple()` yields the tuple `(3, 2, 1)` 

324 """ 

325 return self.major, self.minor, self.patch 

326 

327 

328def bit_indexes(number: int) -> list[int]: 

329 """ 

330 Return the list of indexes that have an active bit on the number. 

331 

332 Example bit_indexes(74) gives [1, 3, 6] (74 == 0b1001010) 

333 """ 

334 return [i for i, c in enumerate(bin(number)[:1:-1]) if c == "1"] 

335 

336 

337def sequence_indexes(it: Iterable[T]) -> dict[T, int]: 

338 """Return a map with key being the value in the given iterable and value 

339 the index where it appears in the map. If the same element appears more 

340 than once then the last occurence is returned. 

341 Elements must be non mutable so they can appear in the returned map 

342 Example: 

343 

344 `sequence_indexes([10, "bla", 1]) == {10: 0, "bla": 1, 1: 2}` 

345 

346 Args: 

347 lines (Iterable[int]): The iterable to be returned 

348 

349 Returns: 

350 dict[int, int]: map of values and their indexes 

351 """ 

352 return {elem: index for index, elem in enumerate(it)} 

353 

354 

355def index_mask(indexes: dict[T, int], elements: Iterable[T]) -> int: 

356 """Return a bit mask with active bits from elements. The bit index 

357 for each element is given by the indexes 

358 

359 Args: 

360 indexes (dict[T, int]): bit index map 

361 elements (Iterable[T]): list of active elements 

362 

363 Returns: 

364 int: bit mask 

365 """ 

366 return functools.reduce(operator.or_, (1 << indexes[element] for element in elements), 0) 

367 

368 

369ascii_alphanumeric = string.ascii_letters + string.digits 

370 

371 

372def random_name(min_length=32, max_length=32): 

373 """ 

374 Generates a random name like text of ascii characters (letters or digits). 

375 

376 The first character is always a letter 

377 """ 

378 if not (k := random.randint(min_length, max_length)): 

379 return "" 

380 first = random.choice(string.ascii_letters) 

381 return first + "".join(random.choices(ascii_alphanumeric, k=k - 1))