forked from NVIDIA/NVFlare
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_pipe.py
More file actions
304 lines (249 loc) · 10.2 KB
/
file_pipe.py
File metadata and controls
304 lines (249 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import time
from typing import Tuple
from nvflare.fuel.utils.attributes_exportable import ExportMode
from nvflare.fuel.utils.constants import Mode
from nvflare.fuel.utils.pipe.file_accessor import FileAccessor
from nvflare.fuel.utils.pipe.file_name_utils import file_name_to_message, message_to_file_name
from nvflare.fuel.utils.pipe.fobs_file_accessor import FobsFileAccessor
from nvflare.fuel.utils.pipe.pipe import Message, Pipe, Topic
from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number, check_str
class FilePipe(Pipe):
def __init__(self, mode: Mode, root_path: str, file_check_interval=0.1):
"""Implementation of communication through the file system.
Args:
mode (Mode): Mode of the endpoint. A pipe has two endpoints.
An endpoint can be either the one that initiates communication or the one listening.
root_path (str): root path for this file pipe, folders and files will be created under this root_path
for communication.
file_check_interval (float): how often should to check the file exists.
"""
super().__init__(mode=mode)
check_positive_number("file_check_interval", file_check_interval)
check_str("root_path", root_path)
self._remove_root = False
self.root_path = root_path
self.file_check_interval = file_check_interval
self.pipe_path = None
self.x_path = None
self.y_path = None
self.t_path = None
if self.mode == Mode.ACTIVE:
self.get_f = self.x_get
self.put_f = self.x_put
elif self.mode == Mode.PASSIVE:
self.get_f = self.y_get
self.put_f = self.y_put
self.accessor = FobsFileAccessor() # default
def set_file_accessor(self, accessor: FileAccessor):
"""Sets the file accessor to be used by the pipe.
The default file accessor is FobsFileAccessor.
Args:
accessor: the accessor to be used.
"""
check_object_type("accessor", accessor, FileAccessor)
self.accessor = accessor
@staticmethod
def _make_dir(path):
try:
os.mkdir(path)
except FileExistsError:
# this is okay
pass
def open(self, name: str):
if not self.accessor:
raise RuntimeError("File accessor is not set. Make sure to set a FileAccessor before opening the pipe")
if not os.path.exists(self.root_path):
os.makedirs(self.root_path)
self._remove_root = True
pipe_path = os.path.join(self.root_path, name)
if not os.path.exists(pipe_path):
self._make_dir(pipe_path)
x_path = os.path.join(pipe_path, "x")
if not os.path.exists(x_path):
self._make_dir(x_path)
y_path = os.path.join(pipe_path, "y")
if not os.path.exists(y_path):
self._make_dir(y_path)
t_path = os.path.join(pipe_path, "t")
if not os.path.exists(t_path):
self._make_dir(t_path)
self.pipe_path = pipe_path
self.x_path = x_path
self.y_path = y_path
self.t_path = t_path
@staticmethod
def _clear_dir(p: str):
file_list = os.listdir(p)
if file_list:
for f in file_list:
try:
os.remove(os.path.join(p, f))
except FileNotFoundError:
pass
def _create_file(self, to_dir: str, msg: Message) -> str:
file_name = message_to_file_name(msg)
file_path = os.path.join(to_dir, file_name)
tmp_path = os.path.join(self.t_path, file_name)
if not self.pipe_path:
raise BrokenPipeError("pipe broken")
try:
self.accessor.write(msg.data, tmp_path)
os.rename(tmp_path, file_path)
except FileNotFoundError:
raise BrokenPipeError("pipe closed")
return file_path
def clear(self):
self._clear_dir(self.x_path)
self._clear_dir(self.y_path)
self._clear_dir(self.t_path)
def _monitor_file(self, file_path: str, timeout=None) -> bool:
"""Monitors the file until it's read-and-removed by peer, or timed out.
Args:
file_path: the path to be monitored
timeout: how long to wait for timeout
Returns:
whether the file has been read and removed
"""
start = time.time()
while True:
if not self.pipe_path:
raise BrokenPipeError("pipe broken")
if not os.path.exists(file_path):
return True
if timeout and time.time() - start > timeout:
# timed out - try to delete the file
try:
os.remove(file_path)
except FileNotFoundError:
# the file is read by the peer!
return True
return False
time.sleep(self.file_check_interval)
def x_put(self, msg: Message, timeout) -> bool:
"""
Args:
msg:
timeout:
Returns: whether file is read by the peer
"""
# put it in Y's queue
file_path = self._create_file(self.y_path, msg)
return self._monitor_file(file_path, timeout)
def _read_file(self, file_path: str):
# since reading file may take time and another process may try to delete the file
# we move the file to a temp name before reading it
file_name = os.path.basename(file_path)
msg = file_name_to_message(file_name)
tmp_path = os.path.join(self.t_path, file_name)
try:
create_time = os.path.getctime(file_path)
os.rename(file_path, tmp_path)
data = self.accessor.read(tmp_path)
if os.path.isfile(tmp_path):
os.remove(tmp_path) # remove this file
elif os.path.isdir(tmp_path):
shutil.rmtree(tmp_path)
else:
raise RuntimeError(f"cannot remove unsupported path: '{tmp_path}'")
msg.data = data
msg.sent_time = create_time
msg.received_time = time.time()
return msg
except FileNotFoundError:
raise BrokenPipeError("pipe closed")
def _get_next(self, from_dir: str):
try:
files = os.listdir(from_dir)
except Exception:
raise BrokenPipeError(f"error reading from {from_dir}")
if not files:
return None
files = [os.path.join(from_dir, f) for f in files]
def _safe_mtime(f):
try:
return os.path.getmtime(f)
except FileNotFoundError:
return float("inf")
files.sort(key=_safe_mtime)
for file_path in files:
try:
return self._read_file(file_path)
except BrokenPipeError:
# File was removed between listdir and read (TOCTOU race).
# This happens when the sender's heartbeat send times out and
# deletes its own file just as the receiver is about to read it.
# Skip this file and try the next one.
continue
return None
def _get_from_dir(self, from_dir: str, timeout=None):
if not timeout or timeout <= 0:
return self._get_next(from_dir)
start = time.time()
while True:
msg = self._get_next(from_dir)
if msg:
return msg
if time.time() - start >= timeout:
break
time.sleep(self.file_check_interval)
return None
def x_get(self, timeout=None):
# read from X's queue
return self._get_from_dir(self.x_path, timeout)
def y_put(self, msg: Message, timeout) -> bool:
# put it in X's queue
file_path = self._create_file(self.x_path, msg)
return self._monitor_file(file_path, timeout)
def y_get(self, timeout=None):
# read from Y's queue
return self._get_from_dir(self.y_path, timeout)
def send(self, msg: Message, timeout=None) -> bool:
"""Sends the specified message to the peer.
Args:
msg: the message to be sent
timeout: if specified, number of secs to wait for the peer to read the message.
If not specified, wait indefinitely.
Returns:
Whether the message is read by the peer.
"""
if not self.pipe_path:
raise BrokenPipeError("pipe is not open")
if not timeout and msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]:
timeout = 5.0
return self.put_f(msg, timeout)
def receive(self, timeout=None):
if not self.pipe_path:
raise BrokenPipeError("pipe is not open")
return self.get_f(timeout)
def close(self):
pipe_path = self.pipe_path
self.pipe_path = None
if self.mode == Mode.PASSIVE:
if pipe_path and os.path.exists(pipe_path):
shutil.rmtree(pipe_path, ignore_errors=True)
if self._remove_root and os.path.exists(self.root_path):
shutil.rmtree(self.root_path, ignore_errors=True)
def can_resend(self) -> bool:
return False
def export(self, export_mode: str) -> Tuple[str, dict]:
if export_mode == ExportMode.SELF:
mode = self.mode
else:
mode = Mode.ACTIVE if self.mode == Mode.PASSIVE else Mode.PASSIVE
export_args = {"mode": mode, "root_path": self.root_path}
return f"{self.__module__}.{self.__class__.__name__}", export_args