Coverage for linuxpy/io.py: 100%
46 statements
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
« prev ^ index » next coverage.py v7.10.4, created at 2026-02-19 15:11 +0100
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7import functools
8import importlib
9import os
10import select
13def fopen(path, rw=False, binary=True, blocking=False, close_on_exec=True):
14 def opener(path, flags):
15 if not blocking:
16 flags |= os.O_NONBLOCK
17 if close_on_exec:
18 flags |= os.O_CLOEXEC
19 return os.open(path, flags)
21 kwargs = {"buffering": 0, "opener": opener}
22 flgs = "rb" if binary else "r"
23 if isinstance(rw, bool):
24 flgs = "rb" if binary else "r"
25 if rw:
26 flgs += "+"
27 else:
28 flgs = rw
29 if binary:
30 flgs += "b"
32 return open(path, flgs, **kwargs)
35class IO:
36 open = functools.partial(fopen, blocking=False)
37 os = os
38 select = select
41class GeventModule:
42 def __init__(self, name):
43 self.name = name
44 self._module = None
46 @property
47 def module(self):
48 if self._module is None:
49 self._module = importlib.import_module(f"gevent.{self.name}")
50 return self._module
52 def __getattr__(self, name):
53 attr = getattr(self.module, name)
54 setattr(self, name, attr)
55 return attr
58class GeventIO:
59 @staticmethod
60 def open(path, rw=False, blocking=False):
61 mode = "rb+" if rw else "rb"
62 import gevent.fileobject
64 return gevent.fileobject.FileObject(path, mode, buffering=0)
66 os = GeventModule("os")
67 select = GeventModule("select")