From 342901a9693d632bd841ba6d4267ba06a907e23d Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Sat, 14 Mar 2026 11:16:39 -0400 Subject: [PATCH 1/3] x --- .../_security/_ssrf_protection.py | 43 ++++++++++++++----- .../tests/unit_tests/test_ssrf_protection.py | 10 +++++ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/libs/core/langchain_core/_security/_ssrf_protection.py b/libs/core/langchain_core/_security/_ssrf_protection.py index 5afa573c0c229..20632c9a9491f 100644 --- a/libs/core/langchain_core/_security/_ssrf_protection.py +++ b/libs/core/langchain_core/_security/_ssrf_protection.py @@ -68,6 +68,21 @@ ] +def _normalize_ip(ip_str: str) -> str: + """Normalize IP strings for consistent SSRF checks. + + Args: + ip_str: IP address as a string. + + Returns: + Canonical string form, converting IPv6-mapped IPv4 to plain IPv4. + """ + ip = ipaddress.ip_address(ip_str) + if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped is not None: + return str(ip.ipv4_mapped) + return str(ip) + + def is_private_ip(ip_str: str) -> bool: """Check if an IP address is in a private range. @@ -78,7 +93,7 @@ def is_private_ip(ip_str: str) -> bool: True if IP is in a private range, False otherwise """ try: - ip = ipaddress.ip_address(ip_str) + ip = ipaddress.ip_address(_normalize_ip(ip_str)) return any(ip in range_ for range_ in PRIVATE_IP_RANGES) except ValueError: return False @@ -99,8 +114,12 @@ def is_cloud_metadata(hostname: str, ip_str: str | None = None) -> bool: return True # Check IP - if ip_str and ip_str in CLOUD_METADATA_IPS: # noqa: SIM103 - return True + if ip_str: + try: + if _normalize_ip(ip_str) in CLOUD_METADATA_IPS: + return True + except ValueError: + pass return False @@ -122,12 +141,13 @@ def is_localhost(hostname: str, ip_str: str | None = None) -> bool: # Check IP if ip_str: try: - ip = ipaddress.ip_address(ip_str) + normalized_ip = _normalize_ip(ip_str) + ip = ipaddress.ip_address(normalized_ip) # Check if loopback if ip.is_loopback: return True # Also check common localhost IPs - if ip_str in ("127.0.0.1", "::1", "0.0.0.0"): # noqa: S104 + if normalized_ip in ("127.0.0.1", "::1", "0.0.0.0"): # noqa: S104 return True except ValueError: pass @@ -225,20 +245,21 @@ def validate_safe_url( for result in addr_info: ip_str: str = result[4][0] # type: ignore[assignment] + normalized_ip = _normalize_ip(ip_str) # ALWAYS block cloud metadata IPs - if is_cloud_metadata(hostname, ip_str): - msg = f"URL resolves to cloud metadata IP: {ip_str}" + if is_cloud_metadata(hostname, normalized_ip): + msg = f"URL resolves to cloud metadata IP: {normalized_ip}" raise ValueError(msg) # Check for localhost IPs - if is_localhost(hostname, ip_str) and not allow_private: - msg = f"URL resolves to localhost IP: {ip_str}" + if is_localhost(hostname, normalized_ip) and not allow_private: + msg = f"URL resolves to localhost IP: {normalized_ip}" raise ValueError(msg) # Check for private IPs - if not allow_private and is_private_ip(ip_str): - msg = f"URL resolves to private IP address: {ip_str}" + if not allow_private and is_private_ip(normalized_ip): + msg = f"URL resolves to private IP address: {normalized_ip}" raise ValueError(msg) except socket.gaierror as e: diff --git a/libs/core/tests/unit_tests/test_ssrf_protection.py b/libs/core/tests/unit_tests/test_ssrf_protection.py index fc9fd8d10644e..8aa0de7b8a331 100644 --- a/libs/core/tests/unit_tests/test_ssrf_protection.py +++ b/libs/core/tests/unit_tests/test_ssrf_protection.py @@ -143,6 +143,16 @@ def test_cloud_metadata_always_blocked(self) -> None: allow_private=True, ) + def test_ipv6_mapped_ipv4_localhost_blocked(self) -> None: + """Test that IPv6-mapped IPv4 localhost is blocked.""" + with pytest.raises(ValueError, match="localhost"): + validate_safe_url("http://[::ffff:127.0.0.1]:8080/webhook") + + def test_ipv6_mapped_ipv4_cloud_metadata_blocked(self) -> None: + """Test that IPv6-mapped IPv4 cloud metadata is blocked.""" + with pytest.raises(ValueError, match="metadata"): + validate_safe_url("http://[::ffff:169.254.169.254]/latest/meta-data/") + def test_invalid_scheme_blocked(self) -> None: """Test that non-HTTP(S) schemes are blocked.""" with pytest.raises(ValueError, match="scheme"): From c14cd9636208eca6dc5c7dfe55ea8d7f907602e0 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Tue, 17 Mar 2026 16:58:11 -0400 Subject: [PATCH 2/3] fix(core): block link-local metadata range --- .../langchain_core/_security/_ssrf_protection.py | 15 ++++++++++++++- .../core/tests/unit_tests/test_ssrf_protection.py | 9 +++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/libs/core/langchain_core/_security/_ssrf_protection.py b/libs/core/langchain_core/_security/_ssrf_protection.py index 20632c9a9491f..cefd48465b628 100644 --- a/libs/core/langchain_core/_security/_ssrf_protection.py +++ b/libs/core/langchain_core/_security/_ssrf_protection.py @@ -49,10 +49,18 @@ ] # Cloud provider metadata endpoints +CLOUD_METADATA_RANGES = [ + ipaddress.ip_network("169.254.0.0/16"), # IPv4 link-local (used by metadata services) +] + CLOUD_METADATA_IPS = [ "169.254.169.254", # AWS, GCP, Azure, DigitalOcean, Oracle Cloud "169.254.170.2", # AWS ECS task metadata + "169.254.170.23", # AWS EKS Pod Identity Agent "100.100.100.200", # Alibaba Cloud metadata + "fd00:ec2::254", # AWS EC2 IMDSv2 over IPv6 (Nitro instances) + "fd00:ec2::23", # AWS EKS Pod Identity Agent (IPv6) + "fe80::a9fe:a9fe", # OpenStack Nova metadata (IPv6 link-local equiv of 169.254.169.254) ] CLOUD_METADATA_HOSTNAMES = [ @@ -116,7 +124,12 @@ def is_cloud_metadata(hostname: str, ip_str: str | None = None) -> bool: # Check IP if ip_str: try: - if _normalize_ip(ip_str) in CLOUD_METADATA_IPS: + normalized_ip = _normalize_ip(ip_str) + if normalized_ip in CLOUD_METADATA_IPS: + return True + + ip = ipaddress.ip_address(normalized_ip) + if any(ip in range_ for range_ in CLOUD_METADATA_RANGES): return True except ValueError: pass diff --git a/libs/core/tests/unit_tests/test_ssrf_protection.py b/libs/core/tests/unit_tests/test_ssrf_protection.py index 8aa0de7b8a331..ea0768025bb1b 100644 --- a/libs/core/tests/unit_tests/test_ssrf_protection.py +++ b/libs/core/tests/unit_tests/test_ssrf_protection.py @@ -50,7 +50,16 @@ def test_is_cloud_metadata_ips(self) -> None: """Test cloud metadata IP detection.""" assert is_cloud_metadata("example.com", "169.254.169.254") is True assert is_cloud_metadata("example.com", "169.254.170.2") is True + assert is_cloud_metadata("example.com", "169.254.170.23") is True assert is_cloud_metadata("example.com", "100.100.100.200") is True + assert is_cloud_metadata("example.com", "fd00:ec2::254") is True + assert is_cloud_metadata("example.com", "fd00:ec2::23") is True + assert is_cloud_metadata("example.com", "fe80::a9fe:a9fe") is True + + def test_is_cloud_metadata_link_local_range(self) -> None: + """Test that IPv4 link-local is flagged as cloud metadata.""" + assert is_cloud_metadata("example.com", "169.254.1.2") is True + assert is_cloud_metadata("example.com", "169.254.255.254") is True def test_is_cloud_metadata_hostnames(self) -> None: """Test cloud metadata hostname detection.""" From e65d0cdf0930bf33ebe225dd808b3fd39b7d2e06 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Tue, 17 Mar 2026 16:59:34 -0400 Subject: [PATCH 3/3] chore(core): format --- libs/core/langchain_core/_security/_ssrf_protection.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libs/core/langchain_core/_security/_ssrf_protection.py b/libs/core/langchain_core/_security/_ssrf_protection.py index cefd48465b628..5c3df6994c95f 100644 --- a/libs/core/langchain_core/_security/_ssrf_protection.py +++ b/libs/core/langchain_core/_security/_ssrf_protection.py @@ -50,7 +50,9 @@ # Cloud provider metadata endpoints CLOUD_METADATA_RANGES = [ - ipaddress.ip_network("169.254.0.0/16"), # IPv4 link-local (used by metadata services) + ipaddress.ip_network( + "169.254.0.0/16" + ), # IPv4 link-local (used by metadata services) ] CLOUD_METADATA_IPS = [ @@ -60,7 +62,8 @@ "100.100.100.200", # Alibaba Cloud metadata "fd00:ec2::254", # AWS EC2 IMDSv2 over IPv6 (Nitro instances) "fd00:ec2::23", # AWS EKS Pod Identity Agent (IPv6) - "fe80::a9fe:a9fe", # OpenStack Nova metadata (IPv6 link-local equiv of 169.254.169.254) + "fe80::a9fe:a9fe", # OpenStack Nova metadata (IPv6 link-local equiv of + # 169.254.169.254) ] CLOUD_METADATA_HOSTNAMES = [