Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions framework/py/flwr/cli/archive_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2026 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Archive helpers for CLI commands."""

import zipfile
from pathlib import Path

import click


def safe_extract_zip(
zf: zipfile.ZipFile,
dest_dir: Path,
) -> None:
"""Extract ZIP contents safely into the destination directory.

This prevents path traversal (zip-slip) by validating that each member path resolves
within ``dest_dir`` before extraction.
"""
base_dir = dest_dir.resolve()

for member in zf.infolist():
target = (base_dir / member.filename).resolve()
try:
target.relative_to(base_dir)
except ValueError:
raise click.ClickException(
f"Unsafe path in FAB archive: {member.filename}"
) from None

zf.extractall(base_dir)
3 changes: 2 additions & 1 deletion framework/py/flwr/cli/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from flwr.common.constant import APP_DIR, FAB_HASH_TRUNCATION
from flwr.supercore.utils import get_flwr_home

from .archive_utils import safe_extract_zip
from .config_utils import load_and_validate
from .utils import get_sha256_hash

Expand Down Expand Up @@ -110,8 +111,8 @@ def install_from_fab(

with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(fab_file_archive, "r") as zipf:
zipf.extractall(tmpdir)
tmpdir_path = Path(tmpdir)
safe_extract_zip(zipf, tmpdir_path)
info_dir = tmpdir_path / ".info"
if not info_dir.exists():
raise click.ClickException("FAB file has incorrect format.")
Expand Down
78 changes: 78 additions & 0 deletions framework/py/flwr/cli/install_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright 2026 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for Flower command line interface `install` command."""


import io
import zipfile
from pathlib import Path

import click
import pytest

from .archive_utils import safe_extract_zip
from .install import install_from_fab


def _zip_bytes(entries: list[tuple[str, bytes]]) -> bytes:
"""Create ZIP bytes from (path, content) entries."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for name, content in entries:
zf.writestr(name, content)
return buf.getvalue()


def test_safe_extract_zip_extracts_regular_files(tmp_path: Path) -> None:
"""Safe extraction should succeed for regular archive entries."""
zip_bytes = _zip_bytes([("dir/file.txt", b"hello")])

with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf:
safe_extract_zip(zf, tmp_path)

assert (tmp_path / "dir" / "file.txt").read_bytes() == b"hello"


def test_safe_extract_zip_rejects_parent_traversal(tmp_path: Path) -> None:
"""Safe extraction should reject path traversal via '..' entries."""
zip_bytes = _zip_bytes([("../evil.txt", b"x")])

with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf:
with pytest.raises(click.ClickException, match="Unsafe path in FAB archive"):
safe_extract_zip(zf, tmp_path)

assert not (tmp_path.parent / "evil.txt").exists()


def test_safe_extract_zip_rejects_absolute_paths(tmp_path: Path) -> None:
"""Safe extraction should reject absolute archive paths."""
zip_bytes = _zip_bytes([("/tmp/evil.txt", b"x")])

with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf:
with pytest.raises(click.ClickException, match="Unsafe path in FAB archive"):
safe_extract_zip(zf, tmp_path)


def test_install_from_fab_rejects_zip_slip(tmp_path: Path) -> None:
"""install_from_fab should fail fast on zip-slip entries."""
fab_bytes = _zip_bytes(
[
("../evil.txt", b"x"),
(".info/CONTENT", b""),
]
)

with pytest.raises(click.ClickException, match="Unsafe path in FAB archive"):
_ = install_from_fab(fab_bytes, install_dir=tmp_path, skip_prompt=True)
38 changes: 2 additions & 36 deletions framework/py/flwr/cli/new/new.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from flwr.supercore.constant import PLATFORM_API_URL
from flwr.supercore.utils import parse_app_spec, request_download_link

from ..archive_utils import safe_extract_zip
from ..utils import prompt_options, prompt_text


Expand Down Expand Up @@ -141,41 +142,6 @@ def fetch_recommended_apps() -> list[dict[str, str]]:
raise click.ClickException(f"Failed to fetch recommended apps: {e}") from e


# Security: prevent zip-slip
def _safe_extract_zip(zf: zipfile.ZipFile, dest_dir: Path) -> None:
"""Extract ZIP file into destination directory."""
dest_dir = dest_dir.resolve()

def _is_within_directory(base: Path, target: Path) -> bool:
try:
target.relative_to(base)
return True
except ValueError:
return False

for member in zf.infolist():
# Skip directory placeholders;
# ZipInfo can represent them as names ending with '/'.
if member.is_dir():
target_path = (dest_dir / member.filename).resolve()
if not _is_within_directory(dest_dir, target_path):
raise ValueError(f"Unsafe path in zip: {member.filename}")
target_path.mkdir(parents=True, exist_ok=True)
continue

# Files
target_path = (dest_dir / member.filename).resolve()
if not _is_within_directory(dest_dir, target_path):
raise ValueError(f"Unsafe path in zip: {member.filename}")

# Ensure parent exists
target_path.parent.mkdir(parents=True, exist_ok=True)

# Extract
with zf.open(member, "r") as src, open(target_path, "wb") as dst:
dst.write(src.read())


def _download_zip_to_memory(presigned_url: str) -> io.BytesIO:
"""Download ZIP file from Platform API to memory."""
try:
Expand Down Expand Up @@ -238,6 +204,6 @@ def download_remote_app_via_api(app_spec: str) -> None:
bold=True,
)
with zipfile.ZipFile(zip_buf) as zf:
_safe_extract_zip(zf, Path.cwd())
safe_extract_zip(zf, Path.cwd())

print_success_prompt(app_name)
40 changes: 40 additions & 0 deletions framework/py/flwr/cli/new/new_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,19 @@
"""Test for Flower command line interface `new` command."""


import importlib
import io
import zipfile
from pathlib import Path
from unittest.mock import Mock

import click
import pytest

from .new import download_remote_app_via_api

new_module = importlib.import_module("flwr.cli.new.new")


@pytest.mark.parametrize(
"value",
Expand All @@ -40,3 +48,35 @@ def test_download_remote_app_via_api_rejects_invalid_formats(value: str) -> None

# Ensure we specifically exited with code 1
assert exc.value.exit_code == 1


def test_download_remote_app_via_api_rejects_zip_slip(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""Reject app ZIP archives containing path traversal entries."""

def _zip_bytes(entries: list[tuple[str, bytes]]) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for name, content in entries:
zf.writestr(name, content)
return buf.getvalue()

malicious_zip = _zip_bytes([("../evil.txt", b"x")])
mock_response = Mock(content=malicious_zip, raise_for_status=lambda: None)

monkeypatch.chdir(tmp_path)
monkeypatch.setattr(
new_module,
"request_download_link",
lambda *_args, **_kwargs: ("https://example.invalid/fake.zip", None),
)
monkeypatch.setattr(
new_module.requests,
"get",
lambda *_args, **_kwargs: mock_response,
)

with pytest.raises(click.ClickException, match="Unsafe path in FAB archive"):
download_remote_app_via_api("@account/app==1.2.3")