Coverage for linuxpy/util.py: 95%

169 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-21 07:58 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7"""Utility functions used by the library. Mostly for internal usage""" 

8 

9import asyncio 

10import contextlib 

11import functools 

12import operator 

13import random 

14import selectors 

15import string 

16 

17from .types import ( 

18 AsyncIterator, 

19 Callable, 

20 Collection, 

21 FDLike, 

22 Iterable, 

23 Iterator, 

24 Optional, 

25 Sequence, 

26 T, 

27 Union, 

28) 

29 

30#: used to distinguish passing or not a kwargs 

31sentinel = object() 

32 

33 

34def iter_chunks(lst: Sequence, size: int) -> Iterable: 

35 """ 

36 Batch data from the sequence into groups of length n. 

37 The last batch may be shorter than size. 

38 """ 

39 return (lst[i : i + size] for i in range(0, len(lst), size)) 

40 

41 

42def chunks(lst: Sequence, size: int) -> tuple: 

43 """ 

44 Batch data from the sequence into groups of length n. 

45 The last batch may be shorter than size. 

46 """ 

47 return tuple(iter_chunks(lst, size)) 

48 

49 

50def bcd_version_tuple(bcd: int) -> tuple[int, ...]: 

51 """Returns the tuple version of a BCD (binary-coded decimal) number""" 

52 text = f"{bcd:x}" 

53 text_size = len(text) 

54 if text_size % 2: 

55 text = "0" + text 

56 return tuple(int(i) for i in iter_chunks(text, 2)) 

57 

58 

59def bcd_version(bcd: int) -> str: 

60 """Returns the text version of a BCD (binary-coded decimal) number""" 

61 return ".".join(str(i) for i in bcd_version_tuple(bcd)) 

62 

63 

64def to_fd(fd: FDLike): 

65 """Return a file descriptor from a file object. 

66 

67 Parameters: 

68 fd -- file object or file descriptor 

69 

70 Returns: 

71 corresponding file descriptor 

72 

73 Raises: 

74 ValueError if the object is invalid 

75 """ 

76 if not isinstance(fd, int): 

77 try: 

78 fd = int(fd.fileno()) 

79 except (AttributeError, TypeError, ValueError): 

80 raise ValueError(f"Invalid file object: {fd!r}") from None 

81 if fd < 0: 

82 raise ValueError(f"Invalid file descriptor: {fd}") 

83 return fd 

84 

85 

86int16 = functools.partial(int, base=16) 

87 

88 

89def try_numeric(text: str): 

90 """ 

91 Try to translate given text into int, int base 16 or float. 

92 Returns the orig and return the original text if it fails. 

93 

94 Args: 

95 text (str): text to be translated 

96 

97 Returns: 

98 int, float or str: The converted text 

99 """ 

100 for func in (int, int16, float): 

101 try: 

102 return func(text) 

103 except ValueError: 

104 pass 

105 return text 

106 

107 

108@contextlib.contextmanager 

109def add_reader_asyncio(fd: FDLike, callback: Callable, *args, loop: Optional[asyncio.AbstractEventLoop] = None): 

110 """Add reader during the context and remove it after""" 

111 fd = to_fd(fd) 

112 if loop is None: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true

113 loop = asyncio.get_event_loop() 

114 loop.add_reader(fd, callback, *args) 

115 try: 

116 yield loop 

117 finally: 

118 loop.remove_reader(fd) 

119 

120 

121async def astream(fd: FDLike, read_func: Callable, max_buffer_size=10) -> AsyncIterator: 

122 queue = asyncio.Queue(maxsize=max_buffer_size) 

123 

124 def feed(): 

125 queue.put_nowait(read_func()) 

126 

127 with add_reader_asyncio(fd, feed): 

128 while True: 

129 event = await queue.get() 

130 yield event 

131 

132 

133class aclosing(contextlib.AbstractAsyncContextManager): 

134 """Async context manager for safely finalizing an asynchronously cleaned-up 

135 resource such as an async generator, calling its ``aclose()`` method. 

136 

137 This is a copy of contextlib.aclosing needed for python 3.9 compatibility 

138 """ 

139 

140 def __init__(self, thing): 

141 self.thing = thing 

142 

143 async def __aenter__(self): 

144 return self.thing 

145 

146 async def __aexit__(self, *exc_info): 

147 await self.thing.aclose() 

148 

149 

150def Selector(fds: Collection[FDLike], events=selectors.EVENT_READ) -> selectors.DefaultSelector: 

151 """A selectors.DefaultSelector with given fds registered""" 

152 selector = selectors.DefaultSelector() 

153 for fd in fds: 

154 selector.register(fd, events) 

155 return selector 

156 

157 

158SelectorEventMask = int 

159SelectorEvent = tuple[selectors.SelectorKey, SelectorEventMask] 

160 

161 

162def selector_stream(selector: selectors.BaseSelector, timeout: Optional[float] = None) -> Iterable[SelectorEvent]: 

163 """A stream of selector read events""" 

164 while True: 

165 yield from selector.select(timeout) 

166 

167 

168def selector_file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[SelectorEvent]: 

169 """An inifinte stream of selector read events""" 

170 yield from selector_stream(Selector(fds), timeout) 

171 

172 

173def file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[FDLike]: 

174 """An infinite stream of read ready file descriptors""" 

175 yield from (key.fileobj for key, _ in selector_file_stream(fds, timeout)) 175 ↛ exit,   175 ↛ exit2 missed branches: 1) line 175 didn't finish the generator expression on line 175, 2) line 175 didn't return from function 'file_stream' because

176 

177 

178def event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T], timeout: Optional[float] = None) -> Iterable[T]: 

179 """An infinite stream of events. The given read callable is called for each file 

180 that is reported as ready""" 

181 yield from (read(fd) for fd in file_stream(fds)) 181 ↛ exit,   181 ↛ exit2 missed branches: 1) line 181 didn't finish the generator expression on line 181, 2) line 181 didn't return from function 'event_stream' because

182 

183 

184async def async_selector_stream(selector: selectors.BaseSelector) -> AsyncIterator[SelectorEvent]: 

185 """An asyncronous infinite stream of selector read events""" 

186 async with aclosing(astream(selector, selector.select)) as stream: 

187 async for events in stream: 187 ↛ exitline 187 didn't return from function 'async_selector_stream' because the loop on line 187 didn't complete

188 for event in events: 

189 yield event 

190 

191 

192async def async_selector_file_stream(fds: Collection[FDLike]) -> AsyncIterator[SelectorEvent]: 

193 """An asyncronous infinite stream of selector read events""" 

194 selector = Selector(fds) 

195 async with aclosing(async_selector_stream(selector)) as stream: 

196 async for event in stream: 196 ↛ exitline 196 didn't return from function 'async_selector_file_stream' because the loop on line 196 didn't complete

197 yield event 

198 

199 

200async def async_file_stream(fds: Collection[FDLike]) -> AsyncIterator[FDLike]: 

201 """An asyncronous infinite stream of read ready files""" 

202 async with aclosing(async_selector_file_stream(fds)) as stream: 

203 async for key, _ in stream: 203 ↛ exitline 203 didn't return from function 'async_file_stream' because the loop on line 203 didn't complete

204 yield key.fileobj 

205 

206 

207async def async_event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T]): 

208 """An asyncronous stream of events. The given read callable is called for each file 

209 that is reported as ready""" 

210 async with aclosing(async_file_stream(fds)) as stream: 

211 async for fd in stream: 211 ↛ exitline 211 didn't return from function 'async_event_stream' because the loop on line 211 didn't complete

212 yield read(fd) 

213 

214 

215def make_find(iter_devices: Callable[[], Iterator], needs_open=True) -> Callable: 

216 """ 

217 Create a find function for the given callable. The callable should 

218 return an iterable where each element has the context manager capability 

219 (ie, it can be used in a with statement) 

220 """ 

221 

222 def find(find_all=False, custom_match=None, **kwargs): 

223 idevs = iter_devices() 

224 if kwargs or custom_match: 

225 

226 def simple_accept(dev): 

227 result = all(getattr(dev, key) == value for key, value in kwargs.items()) 

228 if result and custom_match: 

229 return custom_match(dev) 

230 return result 

231 

232 if needs_open: 

233 

234 def accept(dev): 

235 with dev: 

236 return simple_accept(dev) 

237 else: 

238 accept = simple_accept 

239 

240 idevs = filter(accept, idevs) 

241 return idevs if find_all else next(idevs, None) 

242 

243 return find 

244 

245 

246class Version: 

247 """Simple version supporting only a.b.c format""" 

248 

249 def __init__(self, major: int, minor: int, patch: int): 

250 self.major = major 

251 self.minor = minor 

252 self.patch = patch 

253 

254 @classmethod 

255 def from_tuple(cls, sequence: Iterable[Union[str, int]]): 

256 """ 

257 Create a Version from any sequence/iterable of 3 elements 

258 

259 Examples that create a Version "3.2.1" object: 

260 ```python 

261 Version.from_tuple([3, 2, 1]) 

262 Version.from_tuple(["3", 2, "1"]) 

263 Version.from_tuple(range(3, 0, -1)) 

264 

265 # This will work although not recommended since it will not work 

266 # when any of the members is bigger than 9 

267 Version.from_tuple("321") 

268 ``` 

269 """ 

270 return cls(*map(int, sequence)) 

271 

272 @classmethod 

273 def from_str(cls, text): 

274 """ 

275 Create a Version from text 

276 

277 Example that create a Version "3.2.1" object: 

278 ```python 

279 Version.from_str("3.2.1") 

280 ``` 

281 """ 

282 return cls.from_tuple(text.split(".", 2)) 

283 

284 @classmethod 

285 def from_number(cls, number: int): 

286 """ 

287 Create a Version from an integer where each member corresponds to 

288 one byte in the integer so that 0xFFFEFD corresponds to 255.254.253 

289 

290 Example that create a Version "3.2.1" object: 

291 ```python 

292 Version.from_number((3<<16) + (2<<8) + 1) 

293 ``` 

294 """ 

295 return cls((number >> 16) & 0xFF, (number >> 8) & 0xFF, number & 0xFF) 

296 

297 def __int__(self): 

298 return (self.major << 16) + (self.minor << 8) + self.patch 

299 

300 def __repr__(self): 

301 return f"{self.major}.{self.minor}.{self.patch}" 

302 

303 def __format__(self, fmt): 

304 return format(repr(self), fmt) 

305 

306 def __getitem__(self, item): 

307 return self.tuple[item] 

308 

309 def __eq__(self, other): 

310 return self.tuple == self._try_convert(other).tuple 

311 

312 def __lt__(self, other): 

313 return self.tuple < self._try_convert(other).tuple 

314 

315 def __le__(self, other): 

316 return self == other or self < other 

317 

318 def __gt__(self, other): 

319 return not self <= other 

320 

321 def __ge__(self, other): 

322 return not self < other 

323 

324 @classmethod 

325 def _try_convert(cls, value): 

326 if isinstance(value, cls): 

327 return value 

328 elif isinstance(value, int): 

329 return cls.from_number(value) 

330 elif isinstance(value, str): 

331 return cls.from_str(value) 

332 elif isinstance(value, (tuple, list)): 

333 return cls.from_tuple(value) 

334 raise ValueError("Comparison with non-Version object") 

335 

336 @property 

337 def tuple(self): 

338 """ 

339 Returns a 3 element tuple representing the version 

340 so `Version(3,2,1).tuple()` yields the tuple `(3, 2, 1)` 

341 """ 

342 return self.major, self.minor, self.patch 

343 

344 

345def bit_indexes(number: int) -> list[int]: 

346 """ 

347 Return the list of indexes that have an active bit on the number. 

348 

349 Example bit_indexes(74) gives [1, 3, 6] (74 == 0b1001010) 

350 """ 

351 return [i for i, c in enumerate(bin(number)[:1:-1]) if c == "1"] 

352 

353 

354def sequence_indexes(it: Iterable[T]) -> dict[T, int]: 

355 """Return a map with key being the value in the given iterable and value 

356 the index where it appears in the map. If the same element appears more 

357 than once then the last occurence is returned. 

358 Elements must be non mutable so they can appear in the returned map 

359 Example: 

360 

361 `sequence_indexes([10, "bla", 1]) == {10: 0, "bla": 1, 1: 2}` 

362 

363 Args: 

364 lines (Iterable[int]): The iterable to be returned 

365 

366 Returns: 

367 dict[int, int]: map of values and their indexes 

368 """ 

369 return {elem: index for index, elem in enumerate(it)} 

370 

371 

372def index_mask(indexes: dict[T, int], elements: Iterable[T]) -> int: 

373 """Return a bit mask with active bits from elements. The bit index 

374 for each element is given by the indexes 

375 

376 Args: 

377 indexes (dict[T, int]): bit index map 

378 elements (Iterable[T]): list of active elements 

379 

380 Returns: 

381 int: bit mask 

382 """ 

383 return functools.reduce(operator.or_, (1 << indexes[element] for element in elements), 0) 

384 

385 

386ascii_alphanumeric = string.ascii_letters + string.digits 

387 

388 

389def random_name(min_length=32, max_length=32): 

390 """ 

391 Generates a random name like text of ascii characters (letters or digits). 

392 

393 The first character is always a letter 

394 """ 

395 if not (k := random.randint(min_length, max_length)): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true

396 return "" 

397 first = random.choice(string.ascii_letters) 

398 return first + "".join(random.choices(ascii_alphanumeric, k=k - 1))