1- import errno
21import io
32import mmap
43import os
4+ from abc import ABCMeta , abstractmethod
5+ from tempfile import NamedTemporaryFile , TemporaryFile
56from typing import Optional
67
7- from dmoj .cptbox ._cptbox import memory_fd_create , memory_fd_seal
8+ from dmoj .cptbox ._cptbox import memfd_create , memfd_seal
89
910
10- class MemoryIO (io .FileIO ):
11- def __init__ (self , prefill : Optional [bytes ] = None , seal = False ) -> None :
12- super ().__init__ (memory_fd_create (), 'r+' )
11+ def _make_fd_readonly (fd ):
12+ new_fd = os .open (f'/proc/self/fd/{ fd } ' , os .O_RDONLY )
13+ try :
14+ os .dup2 (new_fd , fd )
15+ finally :
16+ os .close (new_fd )
17+
18+
19+ class MmapableIO (io .FileIO , metaclass = ABCMeta ):
20+ def __init__ (self , fd , * , prefill : Optional [bytes ] = None , seal = False ) -> None :
21+ super ().__init__ (fd , 'r+' )
22+
1323 if prefill :
1424 self .write (prefill )
1525 if seal :
1626 self .seal ()
1727
18- def seal (self ) -> None :
19- fd = self .fileno ()
20- try :
21- memory_fd_seal (fd )
22- except OSError as e :
23- if e .errno == errno .ENOSYS :
24- # FreeBSD
25- self .seek (0 , os .SEEK_SET )
26- return
27- raise
28+ @classmethod
29+ @abstractmethod
30+ def usable_with_name (cls ) -> bool :
31+ ...
2832
29- new_fd = os .open (f'/proc/self/fd/{ fd } ' , os .O_RDONLY )
30- try :
31- os .dup2 (new_fd , fd )
32- finally :
33- os .close (new_fd )
33+ @abstractmethod
34+ def seal (self ) -> None :
35+ ...
3436
37+ @abstractmethod
3538 def to_path (self ) -> str :
36- return f'/proc/ { os . getpid () } /fd/ { self . fileno () } '
39+ ...
3740
3841 def to_bytes (self ) -> bytes :
3942 try :
@@ -43,3 +46,71 @@ def to_bytes(self) -> bytes:
4346 if e .args [0 ] == 'cannot mmap an empty file' :
4447 return b''
4548 raise
49+
50+
51+ class NamedFileIO (MmapableIO ):
52+ _name : str
53+
54+ def __init__ (self , * , prefill : Optional [bytes ] = None , seal = False ) -> None :
55+ with NamedTemporaryFile (delete = False ) as f :
56+ self ._name = f .name
57+ super ().__init__ (os .dup (f .fileno ()), prefill = prefill , seal = seal )
58+
59+ def seal (self ) -> None :
60+ self .seek (0 , os .SEEK_SET )
61+
62+ def close (self ) -> None :
63+ super ().close ()
64+ os .unlink (self ._name )
65+
66+ def to_path (self ) -> str :
67+ return self ._name
68+
69+ @classmethod
70+ def usable_with_name (cls ):
71+ return True
72+
73+
74+ class UnnamedFileIO (MmapableIO ):
75+ def __init__ (self , * , prefill : Optional [bytes ] = None , seal = False ) -> None :
76+ with TemporaryFile () as f :
77+ super ().__init__ (os .dup (f .fileno ()), prefill = prefill , seal = seal )
78+
79+ def seal (self ) -> None :
80+ self .seek (0 , os .SEEK_SET )
81+ _make_fd_readonly (self .fileno ())
82+
83+ def to_path (self ) -> str :
84+ return f'/proc/{ os .getpid ()} /fd/{ self .fileno ()} '
85+
86+ @classmethod
87+ def usable_with_name (cls ):
88+ with cls () as f :
89+ return os .path .exists (f .to_path ())
90+
91+
92+ class MemfdIO (MmapableIO ):
93+ def __init__ (self , * , prefill : Optional [bytes ] = None , seal = False ) -> None :
94+ super ().__init__ (memfd_create (), prefill = prefill , seal = seal )
95+
96+ def seal (self ) -> None :
97+ fd = self .fileno ()
98+ memfd_seal (fd )
99+ _make_fd_readonly (fd )
100+
101+ def to_path (self ) -> str :
102+ return f'/proc/{ os .getpid ()} /fd/{ self .fileno ()} '
103+
104+ @classmethod
105+ def usable_with_name (cls ):
106+ try :
107+ with cls () as f :
108+ return os .path .exists (f .to_path ())
109+ except OSError :
110+ return False
111+
112+
113+ # Try to use memfd if possible, otherwise fallback to unlinked temporary files
114+ # (UnnamedFileIO). On FreeBSD and some other systems, /proc/[pid]/fd doesn't
115+ # exist, so to_path() will not work. We fall back to NamedFileIO in that case.
116+ MemoryIO = next ((i for i in (MemfdIO , UnnamedFileIO , NamedFileIO ) if i .usable_with_name ()))
0 commit comments