Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions nvflare/fuel/utils/pipe/file_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,29 @@ def _get_next(self, from_dir: str):
except Exception:
raise BrokenPipeError(f"error reading from {from_dir}")

if files:
files = [os.path.join(from_dir, f) for f in files]
files.sort(key=os.path.getmtime, reverse=False)
file_path = files[0]
return self._read_file(file_path)
else:
if not files:
return None

files = [os.path.join(from_dir, f) for f in files]

def _safe_mtime(f):
try:
return os.path.getmtime(f)
except FileNotFoundError:
return float("inf")

files.sort(key=_safe_mtime)
for file_path in files:
try:
return self._read_file(file_path)
except BrokenPipeError:
# File was removed between listdir and read (TOCTOU race).
# This happens when the sender's heartbeat send times out and
# deletes its own file just as the receiver is about to read it.
# Skip this file and try the next one.
continue
return None

def _get_from_dir(self, from_dir: str, timeout=None):
if not timeout or timeout <= 0:
return self._get_next(from_dir)
Expand Down
7 changes: 6 additions & 1 deletion nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,12 @@ def _try_read(self):
# the pipe handler is most likely stopped, but we leave it for the while loop to decide
continue

msg = p.receive()
try:
msg = p.receive()
except BrokenPipeError:
# Transient TOCTOU race in FilePipe: a file disappeared between
# listdir and read. This is not a real pipe failure — retry next cycle.
continue
now = time.time()

if msg:
Expand Down
Loading