forked from NVIDIA/NVFlare
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_pipe_test.py
More file actions
147 lines (115 loc) · 6.07 KB
/
file_pipe_test.py
File metadata and controls
147 lines (115 loc) · 6.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from unittest.mock import patch
import pytest
from nvflare.fuel.utils.constants import Mode
from nvflare.fuel.utils.pipe.file_pipe import FilePipe
class TestFilePipeDeferred:
"""FilePipe must not create root_path at construction time.
Eager os.makedirs() in __init__ materialises template paths like
{WORKSPACE}/{JOB_ID}/{SITE_NAME} as literal directories on the
packaging machine during recipe construction (Codex review finding).
Directory creation must be deferred to open().
"""
def test_init_does_not_create_root_path(self, tmp_path):
"""Constructing FilePipe must not create the root_path directory."""
root = str(tmp_path / "pipe_root")
assert not os.path.exists(root)
FilePipe(mode=Mode.PASSIVE, root_path=root)
assert not os.path.exists(root), "FilePipe.__init__ must not create root_path"
def test_init_does_not_create_template_path(self, tmp_path):
"""Template paths with curly braces must not be created as literal dirs."""
# Simulate the recipe default path under a clean tmp_path so the test
# is not sensitive to leftover directories from previous runs.
template_path = str(tmp_path / "{WORKSPACE}" / "{JOB_ID}" / "{SITE_NAME}")
FilePipe(mode=Mode.PASSIVE, root_path=template_path)
assert not os.path.exists(template_path), "FilePipe.__init__ must not create template path as literal dir"
def test_remove_root_false_before_open(self, tmp_path):
"""_remove_root must be False before open() is called."""
root = str(tmp_path / "pipe_root")
pipe = FilePipe(mode=Mode.PASSIVE, root_path=root)
assert pipe._remove_root is False
def test_open_creates_root_path(self, tmp_path):
"""open() must create root_path when it does not exist."""
root = str(tmp_path / "pipe_root")
assert not os.path.exists(root)
pipe = FilePipe(mode=Mode.PASSIVE, root_path=root)
pipe.open("test_pipe")
assert os.path.exists(root)
def test_open_sets_remove_root_when_it_created_the_dir(self, tmp_path):
"""_remove_root must be True only when open() created root_path."""
root = str(tmp_path / "pipe_root")
pipe = FilePipe(mode=Mode.PASSIVE, root_path=root)
pipe.open("test_pipe")
assert pipe._remove_root is True
def test_open_does_not_set_remove_root_for_preexisting_dir(self, tmp_path):
"""_remove_root must remain False when root_path already existed."""
root = str(tmp_path / "pipe_root")
os.makedirs(root)
pipe = FilePipe(mode=Mode.PASSIVE, root_path=root)
pipe.open("test_pipe")
assert pipe._remove_root is False
class TestFilePipeGetNextTOCTOU:
"""_get_next must handle TOCTOU races without raising BrokenPipeError."""
def _make_pipe(self, tmp_path):
root = str(tmp_path / "pipe_root")
pipe = FilePipe(mode=Mode.PASSIVE, root_path=root)
pipe.open("test_pipe")
return pipe
def test_file_disappears_during_mtime_sort_returns_none(self, tmp_path):
"""If a file disappears while os.getmtime is called during sort, _get_next returns None.
_safe_mtime catches FileNotFoundError and returns float('inf') so the sort
succeeds, but _read_file then also raises BrokenPipeError because the file
is gone. _get_next must skip it and return None.
"""
pipe = self._make_pipe(tmp_path)
from_dir = pipe.y_path
fake_file = os.path.join(from_dir, "fake_msg.fobs")
open(fake_file, "w").close()
with patch("nvflare.fuel.utils.pipe.file_pipe.os.path.getmtime", side_effect=FileNotFoundError):
with patch.object(pipe, "_read_file", side_effect=BrokenPipeError("pipe closed")):
result = pipe._get_next(from_dir)
assert result is None
def test_file_disappears_between_listdir_and_read_returns_none(self, tmp_path):
"""If a file disappears between listdir and _read_file, _get_next returns None."""
pipe = self._make_pipe(tmp_path)
from_dir = pipe.y_path
fake_file = os.path.join(from_dir, "fake_msg.fobs")
open(fake_file, "w").close()
# Simulate the file being removed just before _read_file is called.
original_read_file = pipe._read_file
def disappear_then_raise(path):
os.remove(path)
raise BrokenPipeError("pipe closed")
with patch.object(pipe, "_read_file", side_effect=disappear_then_raise):
result = pipe._get_next(from_dir)
assert result is None
def test_all_files_race_away_returns_none(self, tmp_path):
"""When every file in the listing races away, _get_next returns None without raising."""
pipe = self._make_pipe(tmp_path)
from_dir = pipe.y_path
for i in range(3):
f = os.path.join(from_dir, f"msg_{i}.fobs")
open(f, "w").close()
with patch.object(pipe, "_read_file", side_effect=BrokenPipeError("pipe closed")):
result = pipe._get_next(from_dir)
assert result is None
def test_get_next_does_not_suppress_broken_pipe_from_listdir(self, tmp_path):
"""_get_next must re-raise BrokenPipeError when os.listdir itself fails (real failure)."""
pipe = self._make_pipe(tmp_path)
from_dir = pipe.y_path
with patch("nvflare.fuel.utils.pipe.file_pipe.os.listdir", side_effect=OSError("dir gone")):
with pytest.raises(BrokenPipeError):
pipe._get_next(from_dir)