Skip to content

Commit ac830a0

Browse files
authored
Make exclusive/atomic writes formally possible (#1749)
1 parent 8be05fc commit ac830a0

File tree

11 files changed

+94
-30
lines changed

11 files changed

+94
-30
lines changed

ci/environment-downstream.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ dependencies:
55
- python=3.11
66
- pip:
77
- git+https://github.com/dask/dask
8+
- git+https://github.com/dask/dask-expr

fsspec/asyn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ async def _copy(
408408
continue
409409
raise ex
410410

411-
async def _pipe_file(self, path, value, **kwargs):
411+
async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
412412
raise NotImplementedError
413413

414414
async def _pipe(self, path, value=None, batch_size=None, **kwargs):
@@ -517,7 +517,7 @@ async def _cat_ranges(
517517
coros, batch_size=batch_size, nofiles=True, return_exceptions=True
518518
)
519519

520-
async def _put_file(self, lpath, rpath, **kwargs):
520+
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
521521
raise NotImplementedError
522522

523523
async def _put(

fsspec/implementations/http.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,12 @@ async def _put_file(
273273
chunk_size=5 * 2**20,
274274
callback=DEFAULT_CALLBACK,
275275
method="post",
276+
mode="overwrite",
276277
**kwargs,
277278
):
279+
if mode != "overwrite":
280+
raise NotImplementedError("Exclusive write")
281+
278282
async def gen_chunks():
279283
# Support passing arbitrary file-like objects
280284
# and use them instead of streams.

fsspec/implementations/memory.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,13 @@ def makedirs(self, path, exist_ok=False):
126126
if not exist_ok:
127127
raise
128128

129-
def pipe_file(self, path, value, **kwargs):
129+
def pipe_file(self, path, value, mode="overwrite", **kwargs):
130130
"""Set the bytes of given file
131131
132132
Avoids copies of the data if possible
133133
"""
134-
self.open(path, "wb", data=value)
134+
mode = "xb" if mode == "create" else "wb"
135+
self.open(path, mode=mode, data=value)
135136

136137
def rmdir(self, path):
137138
path = self._strip_protocol(path)
@@ -178,6 +179,8 @@ def _open(
178179
**kwargs,
179180
):
180181
path = self._strip_protocol(path)
182+
if "x" in mode and self.exists(path):
183+
raise FileExistsError
181184
if path in self.pseudo_dirs:
182185
raise IsADirectoryError(path)
183186
parent = path
@@ -197,7 +200,9 @@ def _open(
197200
return f
198201
else:
199202
raise FileNotFoundError(path)
200-
elif mode == "wb":
203+
elif mode in {"wb", "xb"}:
204+
if mode == "xb" and self.exists(path):
205+
raise FileExistsError
201206
m = MemoryFile(self, path, kwargs.get("data"))
202207
if not self._intrans:
203208
m.commit()

fsspec/implementations/reference.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,13 +1181,17 @@ async def _rm_file(self, path, **kwargs):
11811181
) # ignores FileNotFound, just as well for directories
11821182
self.dircache.clear() # this is a bit heavy handed
11831183

1184-
async def _pipe_file(self, path, data):
1184+
async def _pipe_file(self, path, data, mode="overwrite", **kwargs):
1185+
if mode == "create" and self.exists(path):
1186+
raise FileExistsError
11851187
# can be str or bytes
11861188
self.references[path] = data
11871189
self.dircache.clear() # this is a bit heavy handed
11881190

1189-
async def _put_file(self, lpath, rpath, **kwargs):
1191+
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
11901192
# puts binary
1193+
if mode == "create" and self.exists(rpath):
1194+
raise FileExistsError
11911195
with open(lpath, "rb") as f:
11921196
self.references[rpath] = f.read()
11931197
self.dircache.clear() # this is a bit heavy handed

fsspec/implementations/tests/memory/memory_test.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,11 @@ class TestMemoryGet(abstract.AbstractGetTests, MemoryFixtures):
1212

1313
class TestMemoryPut(abstract.AbstractPutTests, MemoryFixtures):
1414
pass
15+
16+
17+
class TestMemoryPipe(abstract.AbstractPipeTests, MemoryFixtures):
18+
pass
19+
20+
21+
class TestMemoryOpen(abstract.AbstractOpenTests, MemoryFixtures):
22+
pass

fsspec/implementations/tests/test_smb.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
pytest.importorskip("smbprotocol")
1616

17+
18+
def delay_rerun(*args):
19+
time.sleep(0.1)
20+
return True
21+
22+
1723
# ruff: noqa: F821
1824

1925
if os.environ.get("WSL_INTEROP"):
@@ -72,7 +78,7 @@ def smb_params(request):
7278
stop_docker(container)
7379

7480

75-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
81+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
7682
def test_simple(smb_params):
7783
adir = "/home/adir"
7884
adir2 = "/home/adir/otherdir/"
@@ -89,7 +95,7 @@ def test_simple(smb_params):
8995
assert not fsmb.exists(adir)
9096

9197

92-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
98+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
9399
def test_auto_mkdir(smb_params):
94100
adir = "/home/adir"
95101
adir2 = "/home/adir/otherdir/"
@@ -116,7 +122,7 @@ def test_auto_mkdir(smb_params):
116122
assert not fsmb.exists(another_dir)
117123

118124

119-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
125+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
120126
def test_with_url(smb_params):
121127
if smb_params["port"] is None:
122128
smb_url = "smb://{username}:{password}@{host}/home/someuser.txt"
@@ -131,7 +137,7 @@ def test_with_url(smb_params):
131137
assert read_result == b"hello"
132138

133139

134-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
140+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
135141
def test_transaction(smb_params):
136142
afile = "/home/afolder/otherdir/afile"
137143
afile2 = "/home/afolder/otherdir/afile2"
@@ -152,14 +158,14 @@ def test_transaction(smb_params):
152158
assert fsmb.find(adir) == [afile, afile2]
153159

154160

155-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
161+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
156162
def test_makedirs_exist_ok(smb_params):
157163
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
158164
fsmb.makedirs("/home/a/b/c")
159165
fsmb.makedirs("/home/a/b/c", exist_ok=True)
160166

161167

162-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
168+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
163169
def test_rename_from_upath(smb_params):
164170
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
165171
fsmb.makedirs("/home/a/b/c", exist_ok=True)

fsspec/spec.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -778,8 +778,12 @@ def cat_file(self, path, start=None, end=None, **kwargs):
778778
return f.read(end - f.tell())
779779
return f.read()
780780

781-
def pipe_file(self, path, value, **kwargs):
781+
def pipe_file(self, path, value, mode="overwrite", **kwargs):
782782
"""Set the bytes of given file"""
783+
if mode == "create" and self.exists(path):
784+
# non-atomic but simple way; or could use "xb" in open(), which is likely
785+
# not as well supported
786+
raise FileExistsError
783787
with self.open(path, "wb", **kwargs) as f:
784788
f.write(value)
785789

@@ -971,8 +975,12 @@ def get(
971975
with callback.branched(rpath, lpath) as child:
972976
self.get_file(rpath, lpath, callback=child, **kwargs)
973977

974-
def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs):
978+
def put_file(
979+
self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
980+
):
975981
"""Copy single file to remote"""
982+
if mode == "create" and self.exists(rpath):
983+
raise FileExistsError
976984
if os.path.isdir(lpath):
977985
self.makedirs(rpath, exist_ok=True)
978986
return None
@@ -1262,6 +1270,9 @@ def open(
12621270
Target file
12631271
mode: str like 'rb', 'w'
12641272
See builtin ``open()``
1273+
Mode "x" (exclusive write) may be implemented by the backend. Even if
1274+
it is, whether it is checked up front or on commit, and whether it is
1275+
atomic is implementation-dependent.
12651276
block_size: int
12661277
Some indication of buffering - this is a value in bytes
12671278
cache_options : dict, optional
@@ -1795,7 +1806,7 @@ def discard(self):
17951806

17961807
def info(self):
17971808
"""File information about this path"""
1798-
if "r" in self.mode:
1809+
if self.readable():
17991810
return self.details
18001811
else:
18011812
raise ValueError("Info not available while writing")
@@ -1842,7 +1853,7 @@ def write(self, data):
18421853
data: bytes
18431854
Set of bytes to be written.
18441855
"""
1845-
if self.mode not in {"wb", "ab"}:
1856+
if not self.writable():
18461857
raise ValueError("File not in write mode")
18471858
if self.closed:
18481859
raise ValueError("I/O operation on closed file.")
@@ -1875,7 +1886,7 @@ def flush(self, force=False):
18751886
if force:
18761887
self.forced = True
18771888

1878-
if self.mode not in {"wb", "ab"}:
1889+
if self.readable():
18791890
# no-op to flush on read-mode
18801891
return
18811892

@@ -2024,29 +2035,30 @@ def close(self):
20242035
return
20252036
if self.closed:
20262037
return
2027-
if self.mode == "rb":
2028-
self.cache = None
2029-
else:
2030-
if not self.forced:
2031-
self.flush(force=True)
2032-
2033-
if self.fs is not None:
2034-
self.fs.invalidate_cache(self.path)
2035-
self.fs.invalidate_cache(self.fs._parent(self.path))
2038+
try:
2039+
if self.mode == "rb":
2040+
self.cache = None
2041+
else:
2042+
if not self.forced:
2043+
self.flush(force=True)
20362044

2037-
self.closed = True
2045+
if self.fs is not None:
2046+
self.fs.invalidate_cache(self.path)
2047+
self.fs.invalidate_cache(self.fs._parent(self.path))
2048+
finally:
2049+
self.closed = True
20382050

20392051
def readable(self):
20402052
"""Whether opened for reading"""
2041-
return self.mode == "rb" and not self.closed
2053+
return "r" in self.mode and not self.closed
20422054

20432055
def seekable(self):
20442056
"""Whether is seekable (only in read mode)"""
20452057
return self.readable()
20462058

20472059
def writable(self):
20482060
"""Whether opened for writing"""
2049-
return self.mode in {"wb", "ab"} and not self.closed
2061+
return self.mode in {"wb", "ab", "xb"} and not self.closed
20502062

20512063
def __del__(self):
20522064
if not self.closed:

fsspec/tests/abstract/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from fsspec.implementations.local import LocalFileSystem
77
from fsspec.tests.abstract.copy import AbstractCopyTests # noqa: F401
88
from fsspec.tests.abstract.get import AbstractGetTests # noqa: F401
9+
from fsspec.tests.abstract.open import AbstractOpenTests # noqa: F401
10+
from fsspec.tests.abstract.pipe import AbstractPipeTests # noqa: F401
911
from fsspec.tests.abstract.put import AbstractPutTests # noqa: F401
1012

1113

fsspec/tests/abstract/open.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import pytest
2+
3+
4+
class AbstractOpenTests:
5+
def test_open_exclusive(self, fs, fs_target):
6+
with fs.open(fs_target, "wb") as f:
7+
f.write(b"data")
8+
with fs.open(fs_target, "rb") as f:
9+
assert f.read() == b"data"
10+
with pytest.raises(FileExistsError):
11+
fs.open(fs_target, "xb")

fsspec/tests/abstract/pipe.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import pytest
2+
3+
4+
class AbstractPipeTests:
5+
def test_pipe_exclusive(self, fs, fs_target):
6+
fs.pipe_file(fs_target, b"data")
7+
assert fs.cat_file(fs_target) == b"data"
8+
with pytest.raises(FileExistsError):
9+
fs.pipe_file(fs_target, b"data", mode="create")
10+
fs.pipe_file(fs_target, b"new data", mode="overwrite")
11+
assert fs.cat_file(fs_target) == b"new data"

0 commit comments

Comments
 (0)