Coverage for linuxpy/io.py: 95%
33 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:58 +0200
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-21 07:58 +0200
1#
2# This file is part of the linuxpy project
3#
4# Copyright (c) 2023 Tiago Coutinho
5# Distributed under the GPLv3 license. See LICENSE for more info.
7import functools
8import os
9import select
12def fopen(path, rw=False, binary=True, blocking=False, close_on_exec=True):
13 def opener(path, flags):
14 if not blocking:
15 flags |= os.O_NONBLOCK
16 if close_on_exec: 16 ↛ 18line 16 didn't jump to line 18 because the condition on line 16 was always true
17 flags |= os.O_CLOEXEC
18 return os.open(path, flags)
20 kwargs = {"buffering": 0, "opener": opener}
21 flgs = "rb" if binary else "r"
22 if isinstance(rw, bool):
23 flgs = "rb" if binary else "r"
24 if rw:
25 flgs += "+"
26 else:
27 flgs = rw
28 if binary: 28 ↛ 31line 28 didn't jump to line 31 because the condition on line 28 was always true
29 flgs += "b"
31 return open(path, flgs, **kwargs)
34class IO:
35 open = functools.partial(fopen, blocking=False)
36 select = select.select
39class GeventIO:
40 @staticmethod
41 def open(path, rw=False):
42 mode = "rb+" if rw else "rb"
43 import gevent.fileobject
45 return gevent.fileobject.FileObject(path, mode, buffering=0)
47 @staticmethod
48 def select(*args, **kwargs):
49 import gevent.select
51 return gevent.select.select(*args, **kwargs)