Coverage for linuxpy/util.py: 98%
164 statements
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
« prev ^ index » next coverage.py v7.6.8, created at 2025-05-27 13:54 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7"""Utility functions used by the library. Mostly for internal usage"""
9import asyncio
10import contextlib
11import functools
12import operator
13import random
14import selectors
15import string
17from .types import (
18 AsyncIterator,
19 Callable,
20 Collection,
21 FDLike,
22 Iterable,
23 Iterator,
24 Optional,
25 Sequence,
26 T,
27 Union,
28)
30#: used to distinguish passing or not a kwargs
31sentinel = object()
34def iter_chunks(lst: Sequence, size: int) -> Iterable:
35 """
36 Batch data from the sequence into groups of length n.
37 The last batch may be shorter than size.
38 """
39 return (lst[i : i + size] for i in range(0, len(lst), size))
42def chunks(lst: Sequence, size: int) -> tuple:
43 """
44 Batch data from the sequence into groups of length n.
45 The last batch may be shorter than size.
46 """
47 return tuple(iter_chunks(lst, size))
50def bcd_version_tuple(bcd: int) -> tuple[int, ...]:
51 """Returns the tuple version of a BCD (binary-coded decimal) number"""
52 text = f"{bcd:x}"
53 text_size = len(text)
54 if text_size % 2:
55 text = "0" + text
56 return tuple(int(i) for i in iter_chunks(text, 2))
59def bcd_version(bcd: int) -> str:
60 """Returns the text version of a BCD (binary-coded decimal) number"""
61 return ".".join(str(i) for i in bcd_version_tuple(bcd))
64def to_fd(fd: FDLike):
65 """Return a file descriptor from a file object.
67 Parameters:
68 fd -- file object or file descriptor
70 Returns:
71 corresponding file descriptor
73 Raises:
74 ValueError if the object is invalid
75 """
76 if not isinstance(fd, int):
77 try:
78 fd = int(fd.fileno())
79 except (AttributeError, TypeError, ValueError):
80 raise ValueError(f"Invalid file object: {fd!r}") from None
81 if fd < 0:
82 raise ValueError(f"Invalid file descriptor: {fd}")
83 return fd
86int16 = functools.partial(int, base=16)
89def try_numeric(text: str, order=(int, int16, float)):
90 """
91 Try to translate given text into int, int base 16 or float.
92 Returns the orig and return the original text if it fails.
94 Args:
95 text (str): text to be translated
97 Returns:
98 int, float or str: The converted text
99 """
100 for func in order:
101 try:
102 return func(text)
103 except ValueError:
104 pass
105 return text
108@contextlib.contextmanager
109def add_reader_asyncio(fd: FDLike, callback: Callable, *args, loop: Optional[asyncio.AbstractEventLoop] = None):
110 """Add reader during the context and remove it after"""
111 fd = to_fd(fd)
112 if loop is None:
113 loop = asyncio.get_event_loop()
114 loop.add_reader(fd, callback, *args)
115 try:
116 yield loop
117 finally:
118 loop.remove_reader(fd)
121async def astream(fd: FDLike, read_func: Callable, max_buffer_size=10) -> AsyncIterator:
122 queue = asyncio.Queue(maxsize=max_buffer_size)
124 def feed():
125 queue.put_nowait(read_func())
127 with add_reader_asyncio(fd, feed):
128 while True:
129 event = await queue.get()
130 yield event
133def Selector(fds: Collection[FDLike], events=selectors.EVENT_READ) -> selectors.DefaultSelector:
134 """A selectors.DefaultSelector with given fds registered"""
135 selector = selectors.DefaultSelector()
136 for fd in fds:
137 selector.register(fd, events)
138 return selector
141SelectorEventMask = int
142SelectorEvent = tuple[selectors.SelectorKey, SelectorEventMask]
145def selector_stream(selector: selectors.BaseSelector, timeout: Optional[float] = None) -> Iterable[SelectorEvent]:
146 """A stream of selector read events"""
147 while True:
148 yield from selector.select(timeout)
151def selector_file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[SelectorEvent]:
152 """An inifinte stream of selector read events"""
153 yield from selector_stream(Selector(fds), timeout)
156def file_stream(fds: Collection[FDLike], timeout: Optional[float] = None) -> Iterable[FDLike]:
157 """An infinite stream of read ready file descriptors"""
158 yield from (key.fileobj for key, _ in selector_file_stream(fds, timeout))
161def event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T], timeout: Optional[float] = None) -> Iterable[T]:
162 """An infinite stream of events. The given read callable is called for each file
163 that is reported as ready"""
164 yield from (read(fd) for fd in file_stream(fds))
167async def async_selector_stream(selector: selectors.BaseSelector) -> AsyncIterator[SelectorEvent]:
168 """An asyncronous infinite stream of selector read events"""
169 async with contextlib.aclosing(astream(selector, selector.select)) as stream:
170 async for events in stream:
171 for event in events:
172 yield event
175async def async_selector_file_stream(fds: Collection[FDLike]) -> AsyncIterator[SelectorEvent]:
176 """An asyncronous infinite stream of selector read events"""
177 selector = Selector(fds)
178 async with contextlib.aclosing(async_selector_stream(selector)) as stream:
179 async for event in stream:
180 yield event
183async def async_file_stream(fds: Collection[FDLike]) -> AsyncIterator[FDLike]:
184 """An asyncronous infinite stream of read ready files"""
185 async with contextlib.aclosing(async_selector_file_stream(fds)) as stream:
186 async for key, _ in stream:
187 yield key.fileobj
190async def async_event_stream(fds: Collection[FDLike], read: Callable[[FDLike], T]):
191 """An asyncronous stream of events. The given read callable is called for each file
192 that is reported as ready"""
193 async with contextlib.aclosing(async_file_stream(fds)) as stream:
194 async for fd in stream:
195 yield read(fd)
198def make_find(iter_devices: Callable[[], Iterator], needs_open=True) -> Callable:
199 """
200 Create a find function for the given callable. The callable should
201 return an iterable where each element has the context manager capability
202 (ie, it can be used in a with statement)
203 """
205 def find(find_all=False, custom_match=None, **kwargs):
206 idevs = iter_devices()
207 if kwargs or custom_match:
209 def simple_accept(dev):
210 result = all(getattr(dev, key) == value for key, value in kwargs.items())
211 if result and custom_match:
212 return custom_match(dev)
213 return result
215 if needs_open:
217 def accept(dev):
218 with dev:
219 return simple_accept(dev)
220 else:
221 accept = simple_accept
223 idevs = filter(accept, idevs)
224 return idevs if find_all else next(idevs, None)
226 return find
229class Version:
230 """Simple version supporting only a.b.c format"""
232 def __init__(self, major: int, minor: int, patch: int):
233 self.major = major
234 self.minor = minor
235 self.patch = patch
237 @classmethod
238 def from_tuple(cls, sequence: Iterable[Union[str, int]]):
239 """
240 Create a Version from any sequence/iterable of 3 elements
242 Examples that create a Version "3.2.1" object:
243 ```python
244 Version.from_tuple([3, 2, 1])
245 Version.from_tuple(["3", 2, "1"])
246 Version.from_tuple(range(3, 0, -1))
248 # This will work although not recommended since it will not work
249 # when any of the members is bigger than 9
250 Version.from_tuple("321")
251 ```
252 """
253 return cls(*map(int, sequence))
255 @classmethod
256 def from_str(cls, text):
257 """
258 Create a Version from text
260 Example that create a Version "3.2.1" object:
261 ```python
262 Version.from_str("3.2.1")
263 ```
264 """
265 return cls.from_tuple(text.split(".", 2))
267 @classmethod
268 def from_number(cls, number: int):
269 """
270 Create a Version from an integer where each member corresponds to
271 one byte in the integer so that 0xFFFEFD corresponds to 255.254.253
273 Example that create a Version "3.2.1" object:
274 ```python
275 Version.from_number((3<<16) + (2<<8) + 1)
276 ```
277 """
278 return cls((number >> 16) & 0xFF, (number >> 8) & 0xFF, number & 0xFF)
280 def __int__(self):
281 return (self.major << 16) + (self.minor << 8) + self.patch
283 def __repr__(self):
284 return f"{self.major}.{self.minor}.{self.patch}"
286 def __format__(self, fmt):
287 return format(repr(self), fmt)
289 def __getitem__(self, item):
290 return self.tuple[item]
292 def __eq__(self, other):
293 return self.tuple == self._try_convert(other).tuple
295 def __lt__(self, other):
296 return self.tuple < self._try_convert(other).tuple
298 def __le__(self, other):
299 return self == other or self < other
301 def __gt__(self, other):
302 return not self <= other
304 def __ge__(self, other):
305 return not self < other
307 @classmethod
308 def _try_convert(cls, value):
309 if isinstance(value, cls):
310 return value
311 elif isinstance(value, int):
312 return cls.from_number(value)
313 elif isinstance(value, str):
314 return cls.from_str(value)
315 elif isinstance(value, (tuple, list)):
316 return cls.from_tuple(value)
317 raise ValueError("Comparison with non-Version object")
319 @property
320 def tuple(self):
321 """
322 Returns a 3 element tuple representing the version
323 so `Version(3,2,1).tuple()` yields the tuple `(3, 2, 1)`
324 """
325 return self.major, self.minor, self.patch
328def bit_indexes(number: int) -> list[int]:
329 """
330 Return the list of indexes that have an active bit on the number.
332 Example bit_indexes(74) gives [1, 3, 6] (74 == 0b1001010)
333 """
334 return [i for i, c in enumerate(bin(number)[:1:-1]) if c == "1"]
337def sequence_indexes(it: Iterable[T]) -> dict[T, int]:
338 """Return a map with key being the value in the given iterable and value
339 the index where it appears in the map. If the same element appears more
340 than once then the last occurence is returned.
341 Elements must be non mutable so they can appear in the returned map
342 Example:
344 `sequence_indexes([10, "bla", 1]) == {10: 0, "bla": 1, 1: 2}`
346 Args:
347 lines (Iterable[int]): The iterable to be returned
349 Returns:
350 dict[int, int]: map of values and their indexes
351 """
352 return {elem: index for index, elem in enumerate(it)}
355def index_mask(indexes: dict[T, int], elements: Iterable[T]) -> int:
356 """Return a bit mask with active bits from elements. The bit index
357 for each element is given by the indexes
359 Args:
360 indexes (dict[T, int]): bit index map
361 elements (Iterable[T]): list of active elements
363 Returns:
364 int: bit mask
365 """
366 return functools.reduce(operator.or_, (1 << indexes[element] for element in elements), 0)
369ascii_alphanumeric = string.ascii_letters + string.digits
372def random_name(min_length=32, max_length=32):
373 """
374 Generates a random name like text of ascii characters (letters or digits).
376 The first character is always a letter
377 """
378 if not (k := random.randint(min_length, max_length)):
379 return ""
380 first = random.choice(string.ascii_letters)
381 return first + "".join(random.choices(ascii_alphanumeric, k=k - 1))