Skip to content

Conversation

@im-soohyun
Copy link

Overview

This pull request simplifies the secure_filename function used in the /api/download endpoint.
The goal is to prevent basic path injection attacks while reducing unnecessary logic and improving maintainability.

Changes

def secure_filename(filename: str) -> str:
    filename = unicodedata.normalize("NFKD", filename)
    filename = filename.encode("ascii", "ignore").decode("ascii")
    for sep in (os.sep, os.path.altsep):
        if sep:
            filename = filename.replace(sep, " ")
    filename = "_".join(filename.split())
    filename = filename.strip("._")
    return filename
  • Updated secure_filename function

    • Applies Unicode normalization and ASCII conversion.
    • Replaces OS-specific path separators (/, \) with spaces, then converts spaces to underscores.
    • Removes leading/trailing dots and underscores to normalize the filename.
  • Modified /api/download endpoint

    • Applies the new secure_filename function to sanitize the file query parameter.
    • Returns HTTP 400 if the sanitized result is empty.
    • If the path exists, returns the file directly or compresses the directory into a ZIP archive before returning.

Motivation

  • Security: Mitigates basic path traversal attacks through minimal yet effective filename sanitization.
  • Simplicity: Removes unnecessary complexity and focuses only on what's needed to safely process filenames.
  • Function Preservation: The existing behavior of serving files or zipped directories remains intact.

Notes

  • No other APIs or logic are affected by this change.
  • This PR strictly targets minimal secure handling for the download endpoint.

Thank you for reviewing!

@vercel
Copy link

vercel bot commented Mar 28, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
composio ✅ Ready (Inspect) Visit Preview 💬 Add feedback Apr 17, 2025 3:30am

Comment on lines +325 to +326
for sep in (os.sep, os.path.altsep):
if sep:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The os module is used in secure_filename() but not imported at the top of the file, which will cause a NameError when the function is called.

Copy link
Contributor

@angrybayblade angrybayblade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shyun020 can you add some unittests?

@angrybayblade angrybayblade changed the base branch from master to development April 2, 2025 05:47
@im-soohyun
Copy link
Author

🧪 Unit Tests for Path Traversal Defense Logic

Hello,

I have added unit tests to verify that the path traversal defense logic in the /api/download endpoint works correctly.

Testing the secure_filename() Function

  • Verify the normalization results for various inputs, including valid filenames, relative paths (../), and whitespace-only strings.

Testing the /api/download API

  • Validate responses for scenarios such as missing file parameters, invalid filenames, non-existent files, and valid file requests.

Filtering Bypass Attempts

  • Test the defense against various bypass inputs including ../, ..\\, URL-encoded, and double-encoded patterns.

Directory Compression Download Test

  • Verify the creation of a zip archive for directories containing subfiles and check the internal file list.

✅ Test Environment

For testing purposes, the following files/directories were created:

  • Created normal_file.txt in the current directory.
  • Created a.txt and b.txt inside the testdir/ directory.
#!/usr/bin/env python3
import os
import unicodedata
import zipfile
from pathlib import Path
import tempfile
import typing as t

# Patch for Pydantic v2 compatibility: Allow BaseModel subscripting
from pydantic import BaseModel
BaseModel.__class_getitem__ = classmethod(lambda cls, item: cls)

from fastapi import FastAPI, HTTPException, Response
from fastapi.responses import FileResponse
from fastapi.testclient import TestClient

# -------------------------------
# 🔹 FastAPI app setup
app = FastAPI()

# -------------------------------
# 🔹 Unified API response model
class APIResponse(BaseModel):
    data: t.Optional[t.Any] = None
    error: t.Optional[str] = None
    traceback: t.Optional[str] = None

    def model_dump_json(self) -> str:
        import json
        return json.dumps(self.dict())

# -------------------------------
# 🔹 API endpoint: file or directory download
@app.get("/api/download")
def _download_file_or_dir(file: t.Optional[str] = None):
    if not file:
        raise HTTPException(
            status_code=400, detail="File path is required as query parameter"
        )
    
    # Sanitize and normalize file input
    safe_file = secure_filename(file)
    if not safe_file:
        raise HTTPException(status_code=400, detail="Invalid file path after sanitization")
    
    path = Path(safe_file)
    if not path.exists():
        return Response(
            content=APIResponse[None](
                data=None,
                error=f"{path} not found",
            ).model_dump_json(),
            status_code=404,
        )

    # If it's a file, return as-is
    if path.is_file():
        return FileResponse(path=path)
    
    # If it's a directory, create a zip and return
    tempdir = tempfile.TemporaryDirectory()
    zipfile_path = Path(tempdir.name, path.name + ".zip")
    return FileResponse(path=_archive(directory=path, output=zipfile_path))

# -------------------------------
# 🔹 Directory archiving logic
def _archive(directory: Path, output: Path) -> Path:
    with zipfile.ZipFile(output, "w", zipfile.ZIP_DEFLATED) as fp:
        for root, _, files in os.walk(directory):
            for file in files:
                fp.write(
                    os.path.join(root, file),
                    os.path.relpath(
                        os.path.join(root, file),
                        os.path.join(directory, ".."),
                    ),
                )
    return output

# -------------------------------
# 🔹 File name sanitization to prevent path traversal
def secure_filename(filename: str) -> str:
    filename = unicodedata.normalize("NFKD", filename)
    filename = filename.encode("ascii", "ignore").decode("ascii")
    for sep in (os.sep, os.path.altsep):
        if sep:
            filename = filename.replace(sep, " ")
    filename = "_".join(filename.split())
    filename = filename.strip("._")
    return filename

# -------------------------------
# 🔹 FastAPI test client for unit testing
client = TestClient(app)

# -------------------------------
# 🔹 Run path injection test cases
def run_tests():
    print("📌 Test secure_filename:")
    # Test secure_filename with normal and malicious inputs
    print("  normal:     ", secure_filename("normal_file.txt"))
    print("  traversal:  ", secure_filename("../secret.txt"))
    print("  whitespace: ", secure_filename("    "))

    print("\n📌 Test download API:")
    # Test when no file parameter is given
    print("  No file param:")
    r = client.get("/api/download")
    print("    ->", r.status_code, r.text)

    # Test with invalid (only whitespace) filename
    print("\n  Invalid file path (just spaces):")
    r = client.get("/api/download", params={"file": "    "})
    print("    ->", r.status_code, r.text)

    # Test when file does not exist
    print("\n  File not found:")
    cwd = os.getcwd()
    with tempfile.TemporaryDirectory() as tmpdir:
        os.chdir(tmpdir)
        r = client.get("/api/download", params={"file": "nonexistent.txt"})
        print("    ->", r.status_code, r.text)
        os.chdir(cwd)

    # Test with a valid existing file
    print("\n📌 Valid file reading (normal_file.txt):")
    cwd = os.getcwd()
    with tempfile.TemporaryDirectory() as tmpdir:
        os.chdir(tmpdir)
        with open("normal_file.txt", "w") as f:
            f.write("Test content for valid file.")
        r = client.get("/api/download", params={"file": "normal_file.txt"})
        if r.status_code == 200:
            print("    ->", r.status_code, r.content.decode())
        else:
            print("    ->", r.status_code, r.text)
        os.chdir(cwd)

    # -------------------------------
    # 🔹 Test bypass attempts with traversal-like paths
    print("\n📌 Filtering bypass attempts:")

    # Case 1: "secret/../normal_file.txt"
    print("  Attempt with 'secret/../normal_file.txt':")
    cwd = os.getcwd()
    with tempfile.TemporaryDirectory() as tmpdir:
        os.chdir(tmpdir)
        with open("normal_file.txt", "w") as f:
            f.write("Test content for valid file.")
        r = client.get("/api/download", params={"file": "secret/../normal_file.txt"})
        print("    ->", r.status_code, r.text)
        os.chdir(cwd)

    # Case 2: Encoded backslash "..%5cnormal_file.txt"
    print("  Attempt with encoded backslash ('..%5cnormal_file.txt'):")
    cwd = os.getcwd()
    with tempfile.TemporaryDirectory() as tmpdir:
        os.chdir(tmpdir)
        with open("normal_file.txt", "w") as f:
            f.write("Test content for valid file.")
        r = client.get("/api/download", params={"file": "..\\normal_file.txt"})
        print("    ->", r.status_code, r.text)
        os.chdir(cwd)

    # Case 3: Various encoded traversal strings
    encoded_test_cases = [
        ("%2e%2e%2f", "Encoded '../' pattern: %2e%2e%2f"),
        ("%2e%2e/", "Encoded '../' pattern: %2e%2e/"),
        ("..%2f", "Encoded '../' pattern: ..%2f"),
        ("%2e%2e%5c", "Encoded '..\\' pattern: %2e%2e%5c"),
        ("%2e%2e\\", "Encoded '..\\' pattern: %2e%2e\\"),
        ("..%5c", "Encoded '..\\' pattern: ..%5c"),
        ("%252e%252e%255c", "Double-encoded '..\\' pattern: %252e%252e%255c"),
        ("..%255c", "Double-encoded '..\\' pattern: ..%255c"),
    ]
    print("  Various URL encoded patterns:")
    for case, description in encoded_test_cases:
        with tempfile.TemporaryDirectory() as tmpdir:
            os.chdir(tmpdir)
            with open("normal_file.txt", "w") as f:
                f.write("Test content for valid file.")
            r = client.get("/api/download", params={"file": case})
            print(f"    {description} ->", r.status_code, r.text)
            os.chdir(cwd)

    # -------------------------------
    # 🔹 Test zip archive creation
    print("\n📌 Test archive creation:")
    with tempfile.TemporaryDirectory() as tmpdir:
        test_dir = Path(tmpdir) / "testdir"
        test_dir.mkdir()
        (test_dir / "a.txt").write_text("hello")
        (test_dir / "b.txt").write_text("world")
        zip_path = Path(tmpdir) / "zipped.zip"
        _archive(test_dir, zip_path)
        print("  Archive created:", zip_path.exists())
        with zipfile.ZipFile(zip_path) as z:
            print("  Zip contents:", z.namelist())

# -------------------------------
# 🔹 Run all test cases when executed directly
if __name__ == "__main__":
    run_tests()

unit-test

@im-soohyun im-soohyun requested a review from angrybayblade April 4, 2025 01:34
@im-soohyun
Copy link
Author

@angrybayblade I'd appreciate it if you could review the unit tests I wrote 🙏

@im-soohyun
Copy link
Author

How long will it take for the feedback?

@im-soohyun
Copy link
Author

@angrybayblade I'd appreciate it if you could review the unit tests I wrote 🙏

@im-soohyun
Copy link
Author

@angrybayblade

Thank you very much for taking the time to review my work.

May I kindly ask if there are any errors in the code I submitted via the Pull Request?

If so, could you please let me know what the specific issues are?

@im-soohyun
Copy link
Author

@angrybayblade

I’m very eager to contribute to this vulnerability-related issue.

May I kindly ask what steps I should take to help resolve it?

@im-soohyun
Copy link
Author

@angrybayblade

When you have time, I would greatly appreciate it if you could kindly let me know which parts contain errors so that I can make the necessary corrections.

Thank you very much for your time and consideration.

@im-soohyun
Copy link
Author

@angrybayblade

knock knock

2 similar comments
@im-soohyun
Copy link
Author

@angrybayblade

knock knock

@im-soohyun
Copy link
Author

@angrybayblade

knock knock

@haxzie haxzie closed this Sep 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants