Coverage for linuxpy/io.py: 95%

33 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-21 07:58 +0200

1# 

2# This file is part of the linuxpy project 

3# 

4# Copyright (c) 2023 Tiago Coutinho 

5# Distributed under the GPLv3 license. See LICENSE for more info. 

6 

7import functools 

8import os 

9import select 

10 

11 

12def fopen(path, rw=False, binary=True, blocking=False, close_on_exec=True): 

13 def opener(path, flags): 

14 if not blocking: 

15 flags |= os.O_NONBLOCK 

16 if close_on_exec: 16 ↛ 18line 16 didn't jump to line 18 because the condition on line 16 was always true

17 flags |= os.O_CLOEXEC 

18 return os.open(path, flags) 

19 

20 kwargs = {"buffering": 0, "opener": opener} 

21 flgs = "rb" if binary else "r" 

22 if isinstance(rw, bool): 

23 flgs = "rb" if binary else "r" 

24 if rw: 

25 flgs += "+" 

26 else: 

27 flgs = rw 

28 if binary: 28 ↛ 31line 28 didn't jump to line 31 because the condition on line 28 was always true

29 flgs += "b" 

30 

31 return open(path, flgs, **kwargs) 

32 

33 

34class IO: 

35 open = functools.partial(fopen, blocking=False) 

36 select = select.select 

37 

38 

39class GeventIO: 

40 @staticmethod 

41 def open(path, rw=False): 

42 mode = "rb+" if rw else "rb" 

43 import gevent.fileobject 

44 

45 return gevent.fileobject.FileObject(path, mode, buffering=0) 

46 

47 @staticmethod 

48 def select(*args, **kwargs): 

49 import gevent.select 

50 

51 return gevent.select.select(*args, **kwargs)