Skip to content

Commit be98784

Browse files
Prepare Patch Release 2.6.5 (#21745)
* Use fs.pipe() for S3/GCS checkpoint uploads in _atomic_save (#21595) * Use fs.pipe() for S3/GCS checkpoint uploads in _atomic_save Replace fs.open() + f.write() with fs.pipe() for S3 and GCS filesystems, enabling parallel multipart uploads. This gives 4-5x throughput improvement for checkpoints >= 500 MB. Azure is excluded because adlfs stages blocks sequentially, making fs.pipe() slower than f.write(). Fixes #21499 Signed-off-by: c-pozzi <corina.rios@gmail.com> * Use isinstance check for Azure exclusion in _atomic_save Use module_available + isinstance instead of string comparison to detect AzureBlobFileSystem. Fix test mocking to patch module_available so the isinstance check works without adlfs installed. Signed-off-by: c-pozzi <corina.rios@gmail.com> * Fix ruff SIM117: combine nested with statements in tests Signed-off-by: c-pozzi <corina.rios@gmail.com> --------- Signed-off-by: c-pozzi <corina.rios@gmail.com> * Update version and changelog --------- Signed-off-by: c-pozzi <corina.rios@gmail.com> Co-authored-by: c-pozzi <37026778+c-pozzi@users.noreply.github.com>
1 parent 43a4411 commit be98784

4 files changed

Lines changed: 83 additions & 4 deletions

File tree

src/lightning/fabric/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
2424

2525
--
2626

27+
## [2.6.5] - 2026-05-27
28+
29+
### Changed
30+
31+
- Use fs.pipe() for S3/GCS checkpoint uploads in _atomic_save ([#21595](https://github.com/Lightning-AI/pytorch-lightning/pull/21595)
32+
2733
## [2.6.2] - 2026-03-19
2834

2935
### Fixed

src/lightning/fabric/utilities/cloud_io.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,21 @@ def _atomic_save(checkpoint: dict[str, Any], filepath: _PATH) -> None:
100100
try:
101101
# We use a transaction here to avoid file corruption if the save gets interrupted
102102
fs, urlpath = fsspec.core.url_to_fs(str(filepath))
103-
with fs.transaction, fs.open(urlpath, "wb") as f:
104-
f.write(bytesbuffer.getvalue())
103+
with fs.transaction:
104+
is_azure = False
105+
if module_available("adlfs"):
106+
from adlfs import AzureBlobFileSystem
107+
108+
is_azure = isinstance(fs, AzureBlobFileSystem)
109+
110+
if _is_object_storage(fs) and not is_azure:
111+
# Use fs.pipe() for S3/GCS where it triggers parallel multipart uploads,
112+
# giving 4-5x throughput improvement for checkpoints >= 500 MB.
113+
# Azure is excluded because adlfs stages blocks sequentially, making pipe() slower.
114+
fs.pipe(urlpath, bytesbuffer.getvalue())
115+
else:
116+
with fs.open(urlpath, "wb") as f:
117+
f.write(bytesbuffer.getvalue())
105118
except PermissionError as e:
106119
if isinstance(e.__context__, OSError) and getattr(e.__context__, "errno", None) == errno.EXDEV:
107120
raise RuntimeError(

src/version.info

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.6.4
1+
2.6.5

tests/tests_fabric/utilities/test_cloud_io.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import os
15+
from unittest import mock
1516

1617
import fsspec
18+
import torch
1719
from fsspec.implementations.local import LocalFileSystem
1820
from fsspec.spec import AbstractFileSystem
1921

20-
from lightning.fabric.utilities.cloud_io import _is_dir, get_filesystem
22+
from lightning.fabric.utilities.cloud_io import _atomic_save, _is_dir, get_filesystem
2123

2224

2325
def test_get_filesystem_custom_filesystem():
@@ -90,3 +92,61 @@ def isfile(self, path):
9092
assert _is_dir(get_filesystem(s3_directory), s3_directory, strict=True)
9193
assert not _is_dir(get_filesystem(s3_directory), s3_file)
9294
assert not _is_dir(get_filesystem(s3_directory), s3_file, strict=True)
95+
96+
97+
def test_atomic_save_uses_pipe_for_s3(tmp_path):
98+
"""Test that _atomic_save uses fs.pipe() for S3 filesystems."""
99+
checkpoint = {"key": torch.tensor([1, 2, 3])}
100+
filepath = "s3://bucket/checkpoint.ckpt"
101+
102+
mock_fs = mock.MagicMock()
103+
mock_fs.__class__.__name__ = "S3FileSystem"
104+
105+
with (
106+
mock.patch("lightning.fabric.utilities.cloud_io._is_object_storage", return_value=True),
107+
mock.patch("fsspec.core.url_to_fs", return_value=(mock_fs, "bucket/checkpoint.ckpt")),
108+
):
109+
_atomic_save(checkpoint, filepath)
110+
111+
mock_fs.pipe.assert_called_once()
112+
mock_fs.open.assert_not_called()
113+
114+
115+
def test_atomic_save_uses_write_for_azure(tmp_path):
116+
"""Test that _atomic_save uses f.write() for Azure filesystems."""
117+
import sys
118+
import types
119+
120+
checkpoint = {"key": torch.tensor([1, 2, 3])}
121+
filepath = "azure://container/checkpoint.ckpt"
122+
123+
# Create a fake adlfs module so isinstance check works
124+
AzureBlobFileSystem = type("AzureBlobFileSystem", (), {})
125+
fake_adlfs = types.ModuleType("adlfs")
126+
fake_adlfs.AzureBlobFileSystem = AzureBlobFileSystem
127+
128+
mock_fs = mock.MagicMock()
129+
mock_fs.__class__ = AzureBlobFileSystem
130+
131+
with (
132+
mock.patch.dict(sys.modules, {"adlfs": fake_adlfs}),
133+
mock.patch("lightning.fabric.utilities.cloud_io.module_available", return_value=True),
134+
mock.patch("lightning.fabric.utilities.cloud_io._is_object_storage", return_value=True),
135+
mock.patch("fsspec.core.url_to_fs", return_value=(mock_fs, "container/checkpoint.ckpt")),
136+
):
137+
_atomic_save(checkpoint, filepath)
138+
139+
mock_fs.pipe.assert_not_called()
140+
mock_fs.open.assert_called_once()
141+
142+
143+
def test_atomic_save_uses_write_for_local(tmp_path):
144+
"""Test that _atomic_save uses f.write() for local filesystems."""
145+
checkpoint = {"key": torch.tensor([1, 2, 3])}
146+
filepath = tmp_path / "checkpoint.ckpt"
147+
148+
_atomic_save(checkpoint, filepath)
149+
150+
assert filepath.exists()
151+
loaded = torch.load(filepath, weights_only=True)
152+
torch.testing.assert_close(loaded["key"], checkpoint["key"])

0 commit comments

Comments
 (0)