Coverage for linuxpy/util.py: 95%
169 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:58 +0200
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:58 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""Utility functions used by the library. Mostly for internal usage"""
9import asyncio
10import contextlib
11import functools
12import operator
13import random
14import selectors
15import string
17from .types import (
18 AsyncIterator,
19 Callable,
20 Collection,
21 FDLike,
22 Iterable,
23 Iterator,
24 Optional,
25 Sequence,
26 T,
27 Union,
28)
30#: used to distinguish passing or not a kwargs
31sentinel = object()
34def iter_chunks(lst: Sequence, size: int) -> Iterable:
35 """
36 Batch data from the sequence into groups of length n.
37 The last batch may be shorter than size.
38 """
39 return (lst[i : i + size] for i in range(0, len(lst), size))
42def chunks(lst: Sequence, size: int) -> tuple:
43 """
44 Batch data from the sequence into groups of length n.
45 The last batch may be shorter than size.
46 """
47 return tuple(iter_chunks(lst, size))
50def bcd_version_tuple(bcd: int) -> tuple[int, ...]:
51 """Returns the tuple version of a BCD (binary-coded decimal) number"""
52 text = f"{bcd:x}"
53 text_size = len(text)
54 if text_size % 2:
55 text = "0" + text
56 return tuple(int(i) for i in iter_chunks(text, 2))
59def bcd_version(bcd: int) -> str:
60 """Returns the text version of a BCD (binary-coded decimal) number"""
61 return ".".join(str(i) for i in bcd_version_tuple(bcd))
64def to_fd(fd: FDLike):
65 """Return a file descriptor from a file object.
67 Parameters:
68 fd -- file object or file descriptor
70 Returns:
71 corresponding file descriptor
73 Raises:
74 ValueError if the object is invalid
75 """
76 if not isinstance(fd, int):
77 try:
78 fd = int(fd.fileno())
79 except (AttributeError, TypeError, ValueError):
80 raise ValueError(f"Invalid file object: {fd!r}") from None
81 if fd < 0:
82 raise ValueError(f"Invalid file descriptor: {fd}")
83 return fd
86int16 = functools.partial(int, base=16)
89def try_numeric(text: str):
90 """
91 Try to translate given text into int, int base 16 or float.
92 Returns the orig and return the original text if it fails.
94 Args:
95 text (str): text to be translated
97 Returns:
98 int, float or str: The converted text
99 """
100 for func in (int, int16, float):
101 try:
102 return func(text)
103 except ValueError:
104 pass
105 return text
108@contextlib.contextmanager
109def add_reader_asyncio(fd: FDLike, callback: Callable, *args, loop: Optional[asyncio.AbstractEventLoop] = None):
110 """Add reader during the context and remove it after"""
111 fd = to_fd(fd)
112 if loop is None: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true
113 loop = asyncio.get_event_loop()
114 loop.add_reader(fd, callback, *args)
115 try:
116 yield loop
117 finally:
118 loop.remove_reader(fd)
121async def astream(fd: FDLike, read_func: Callable, max_buffer_size=10) -> AsyncIterator:
122 queue = asyncio.Queue(maxsize=max_buffer_size)
124 def feed():
125 queue.put_nowait(read_func())
127 with add_reader_asyncio(fd, feed):
128 while True:
129 event = await queue.get()
130 yield event
133class aclosing(contextlib.AbstractAsyncContextManager):
134 """Async context manager for safely finalizing an asynchronously cleaned-up
135 resource such as an async generator, calling its ``aclose()`` method.
137 This is a copy of contextlib.aclosing needed for python 3.9 compatibility
138 """
140 def __init__(self, thing):
141 self.thing = thing
143 async def __aenter__(self):
144 return self.thing
146 async def __aexit__(self, *exc_info):
147 await self.thing.aclose()
150def Selector(fds: Collection[FDLike], events=selectors.EVENT_READ) -> selectors.DefaultSelector:
151 """A selectors.DefaultSelector with given fds registered"""
152 selector = selectors.DefaultSelector()
153 for fd in fds:
154 selector.register(fd, events)
155 return selector
158SelectorEventMask = int
159SelectorEvent = tuple[selectors.SelectorKey, SelectorEventMask]
162def selector_stream(selector: selectors.BaseSelector, timeout: Optional[float] = None) -> Iterable[SelectorEvent]:
163 """A stream of selector read events"""
164 while True:
165 yield from selector.select(timeout)
168def selector_file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[SelectorEvent]:
169 """An inifinte stream of selector read events"""
170 yield from selector_stream(Selector(fds), timeout)
173def file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[FDLike]:
174 """An infinite stream of read ready file descriptors"""
175 yield from (key.fileobj for key, _ in selector_file_stream(fds, timeout)) 175 ↛ exit, 175 ↛ exit2 missed branches: 1) line 175 didn't finish the generator expression on line 175, 2) line 175 didn't return from function 'file_stream' because
178def event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T], timeout: Optional[float] = None) -> Iterable[T]:
179 """An infinite stream of events. The given read callable is called for each file
180 that is reported as ready"""
181 yield from (read(fd) for fd in file_stream(fds)) 181 ↛ exit, 181 ↛ exit2 missed branches: 1) line 181 didn't finish the generator expression on line 181, 2) line 181 didn't return from function 'event_stream' because
184async def async_selector_stream(selector: selectors.BaseSelector) -> AsyncIterator[SelectorEvent]:
185 """An asyncronous infinite stream of selector read events"""
186 async with aclosing(astream(selector, selector.select)) as stream:
187 async for events in stream: 187 ↛ exitline 187 didn't return from function 'async_selector_stream' because the loop on line 187 didn't complete
188 for event in events:
189 yield event
192async def async_selector_file_stream(fds: Collection[FDLike]) -> AsyncIterator[SelectorEvent]:
193 """An asyncronous infinite stream of selector read events"""
194 selector = Selector(fds)
195 async with aclosing(async_selector_stream(selector)) as stream:
196 async for event in stream: 196 ↛ exitline 196 didn't return from function 'async_selector_file_stream' because the loop on line 196 didn't complete
197 yield event
200async def async_file_stream(fds: Collection[FDLike]) -> AsyncIterator[FDLike]:
201 """An asyncronous infinite stream of read ready files"""
202 async with aclosing(async_selector_file_stream(fds)) as stream:
203 async for key, _ in stream: 203 ↛ exitline 203 didn't return from function 'async_file_stream' because the loop on line 203 didn't complete
204 yield key.fileobj
207async def async_event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T]):
208 """An asyncronous stream of events. The given read callable is called for each file
209 that is reported as ready"""
210 async with aclosing(async_file_stream(fds)) as stream:
211 async for fd in stream: 211 ↛ exitline 211 didn't return from function 'async_event_stream' because the loop on line 211 didn't complete
212 yield read(fd)
215def make_find(iter_devices: Callable[[], Iterator], needs_open=True) -> Callable:
216 """
217 Create a find function for the given callable. The callable should
218 return an iterable where each element has the context manager capability
219 (ie, it can be used in a with statement)
220 """
222 def find(find_all=False, custom_match=None, **kwargs):
223 idevs = iter_devices()
224 if kwargs or custom_match:
226 def simple_accept(dev):
227 result = all(getattr(dev, key) == value for key, value in kwargs.items())
228 if result and custom_match:
229 return custom_match(dev)
230 return result
232 if needs_open:
234 def accept(dev):
235 with dev:
236 return simple_accept(dev)
237 else:
238 accept = simple_accept
240 idevs = filter(accept, idevs)
241 return idevs if find_all else next(idevs, None)
243 return find
246class Version:
247 """Simple version supporting only a.b.c format"""
249 def __init__(self, major: int, minor: int, patch: int):
250 self.major = major
251 self.minor = minor
252 self.patch = patch
254 @classmethod
255 def from_tuple(cls, sequence: Iterable[Union[str, int]]):
256 """
257 Create a Version from any sequence/iterable of 3 elements
259 Examples that create a Version "3.2.1" object:
260 ```python
261 Version.from_tuple([3, 2, 1])
262 Version.from_tuple(["3", 2, "1"])
263 Version.from_tuple(range(3, 0, -1))
265 # This will work although not recommended since it will not work
266 # when any of the members is bigger than 9
267 Version.from_tuple("321")
268 ```
269 """
270 return cls(*map(int, sequence))
272 @classmethod
273 def from_str(cls, text):
274 """
275 Create a Version from text
277 Example that create a Version "3.2.1" object:
278 ```python
279 Version.from_str("3.2.1")
280 ```
281 """
282 return cls.from_tuple(text.split(".", 2))
284 @classmethod
285 def from_number(cls, number: int):
286 """
287 Create a Version from an integer where each member corresponds to
288 one byte in the integer so that 0xFFFEFD corresponds to 255.254.253
290 Example that create a Version "3.2.1" object:
291 ```python
292 Version.from_number((3<<16) + (2<<8) + 1)
293 ```
294 """
295 return cls((number >> 16) & 0xFF, (number >> 8) & 0xFF, number & 0xFF)
297 def __int__(self):
298 return (self.major << 16) + (self.minor << 8) + self.patch
300 def __repr__(self):
301 return f"{self.major}.{self.minor}.{self.patch}"
303 def __format__(self, fmt):
304 return format(repr(self), fmt)
306 def __getitem__(self, item):
307 return self.tuple[item]
309 def __eq__(self, other):
310 return self.tuple == self._try_convert(other).tuple
312 def __lt__(self, other):
313 return self.tuple < self._try_convert(other).tuple
315 def __le__(self, other):
316 return self == other or self < other
318 def __gt__(self, other):
319 return not self <= other
321 def __ge__(self, other):
322 return not self < other
324 @classmethod
325 def _try_convert(cls, value):
326 if isinstance(value, cls):
327 return value
328 elif isinstance(value, int):
329 return cls.from_number(value)
330 elif isinstance(value, str):
331 return cls.from_str(value)
332 elif isinstance(value, (tuple, list)):
333 return cls.from_tuple(value)
334 raise ValueError("Comparison with non-Version object")
336 @property
337 def tuple(self):
338 """
339 Returns a 3 element tuple representing the version
340 so `Version(3,2,1).tuple()` yields the tuple `(3, 2, 1)`
341 """
342 return self.major, self.minor, self.patch
345def bit_indexes(number: int) -> list[int]:
346 """
347 Return the list of indexes that have an active bit on the number.
349 Example bit_indexes(74) gives [1, 3, 6] (74 == 0b1001010)
350 """
351 return [i for i, c in enumerate(bin(number)[:1:-1]) if c == "1"]
354def sequence_indexes(it: Iterable[T]) -> dict[T, int]:
355 """Return a map with key being the value in the given iterable and value
356 the index where it appears in the map. If the same element appears more
357 than once then the last occurence is returned.
358 Elements must be non mutable so they can appear in the returned map
359 Example:
361 `sequence_indexes([10, "bla", 1]) == {10: 0, "bla": 1, 1: 2}`
363 Args:
364 lines (Iterable[int]): The iterable to be returned
366 Returns:
367 dict[int, int]: map of values and their indexes
368 """
369 return {elem: index for index, elem in enumerate(it)}
372def index_mask(indexes: dict[T, int], elements: Iterable[T]) -> int:
373 """Return a bit mask with active bits from elements. The bit index
374 for each element is given by the indexes
376 Args:
377 indexes (dict[T, int]): bit index map
378 elements (Iterable[T]): list of active elements
380 Returns:
381 int: bit mask
382 """
383 return functools.reduce(operator.or_, (1 << indexes[element] for element in elements), 0)
386ascii_alphanumeric = string.ascii_letters + string.digits
389def random_name(min_length=32, max_length=32):
390 """
391 Generates a random name like text of ascii characters (letters or digits).
393 The first character is always a letter
394 """
395 if not (k := random.randint(min_length, max_length)): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 return ""
397 first = random.choice(string.ascii_letters)
398 return first + "".join(random.choices(ascii_alphanumeric, k=k - 1))