Coverage for linuxpy/io.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2026-02-19 15:11 +0100

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7import functools 

8import importlib 

9import os 

10import select 

11 

12 

13def fopen(path, rw=False, binary=True, blocking=False, close_on_exec=True): 

14 def opener(path, flags): 

15 if not blocking: 

16 flags |= os.O_NONBLOCK 

17 if close_on_exec: 

18 flags |= os.O_CLOEXEC 

19 return os.open(path, flags) 

20 

21 kwargs = {"buffering": 0, "opener": opener} 

22 flgs = "rb" if binary else "r" 

23 if isinstance(rw, bool): 

24 flgs = "rb" if binary else "r" 

25 if rw: 

26 flgs += "+" 

27 else: 

28 flgs = rw 

29 if binary: 

30 flgs += "b" 

31 

32 return open(path, flgs, **kwargs) 

33 

34 

35class IO: 

36 open = functools.partial(fopen, blocking=False) 

37 os = os 

38 select = select 

39 

40 

41class GeventModule: 

42 def __init__(self, name): 

43 self.name = name 

44 self._module = None 

45 

46 @property 

47 def module(self): 

48 if self._module is None: 

49 self._module = importlib.import_module(f"gevent.{self.name}") 

50 return self._module 

51 

52 def __getattr__(self, name): 

53 attr = getattr(self.module, name) 

54 setattr(self, name, attr) 

55 return attr 

56 

57 

58class GeventIO: 

59 @staticmethod 

60 def open(path, rw=False, blocking=False): 

61 mode = "rb+" if rw else "rb" 

62 import gevent.fileobject 

63 

64 return gevent.fileobject.FileObject(path, mode, buffering=0) 

65 

66 os = GeventModule("os") 

67 select = GeventModule("select")