@@ -339,9 +339,36 @@ async def verify_html(
339339 return DomainVerificationResponse (status = "error" , domainVerification = "Failure" )
340340
341341
342+ def _is_safe_public_ip (ip ) -> bool :
343+ """
344+ Return True only for routable public IPs (IPv4 or IPv6).
345+
346+ IPv4-mapped IPv6 addresses (e.g. ``::ffff:127.0.0.1``) are unwrapped and the
347+ embedded IPv4 is validated, so they cannot be used to smuggle an internal
348+ target past the IPv4 checks.
349+ """
350+ mapped = getattr (ip , "ipv4_mapped" , None )
351+ if mapped is not None :
352+ ip = mapped
353+
354+ return not (
355+ ip .is_private
356+ or ip .is_loopback
357+ or ip .is_link_local
358+ or ip .is_multicast
359+ or ip .is_reserved
360+ or ip .is_unspecified
361+ )
362+
363+
342364def get_safe_domain_ip (domain : str ) -> Union [str , None ]:
343365 """
344- Resolve domain and return a safe IP address if validation passes.
366+ Resolve a domain (IPv4 and IPv6) and return a safe public IP to pin to.
367+
368+ Strict policy: every resolved address across both families must be
369+ public/safe, otherwise the domain is rejected (prevents SSRF and
370+ DNS-rebinding via a mixed public/private record set). IPv4 is preferred
371+ when both families are available, falling back to IPv6.
345372
346373 Args:
347374 domain: The domain to check
@@ -350,32 +377,36 @@ def get_safe_domain_ip(domain: str) -> Union[str, None]:
350377 str: A validated public IP address, or None if domain is unsafe
351378 """
352379 try :
353- # Resolve domain to IP addresses
354- ip_addresses = socket .getaddrinfo (domain , None , socket .AF_INET )
380+ # Dual-stack resolve (A + AAAA records)
381+ ip_addresses = socket .getaddrinfo (
382+ domain , 443 , family = socket .AF_UNSPEC , type = socket .SOCK_STREAM
383+ )
355384
356385 if not ip_addresses :
357386 return None
358387
359- # Check all resolved IPs - ALL must be safe
360- validated_ips = []
388+ ipv4_safe = []
389+ ipv6_safe = []
361390 for result in ip_addresses :
362391 ip_str = result [4 ][0 ]
363- ip = ipaddress .ip_address (ip_str )
364-
365- # Check if IP is private, loopback, link-local, multicast, or reserved
366- if (
367- ip .is_private
368- or ip .is_loopback
369- or ip .is_link_local
370- or ip .is_multicast
371- or ip .is_reserved
372- ):
392+ # Strip any IPv6 scope id (e.g. fe80::1%eth0) before parsing
393+ ip = ipaddress .ip_address (ip_str .split ("%" )[0 ])
394+
395+ # Strict: a single unsafe address rejects the whole domain
396+ if not _is_safe_public_ip (ip ):
373397 return None
374398
375- validated_ips .append (ip_str )
399+ if ip .version == 4 :
400+ ipv4_safe .append (ip_str )
401+ else :
402+ ipv6_safe .append (ip_str )
376403
377- # Return the first validated IP
378- return validated_ips [0 ] if validated_ips else None
404+ # Prefer IPv4, fall back to IPv6
405+ if ipv4_safe :
406+ return ipv4_safe [0 ]
407+ if ipv6_safe :
408+ return ipv6_safe [0 ]
409+ return None
379410
380411 except (socket .gaierror , ValueError , OSError ):
381412 # DNS resolution failed or invalid IP
@@ -419,7 +450,9 @@ async def check_file(domain: str, prefix: str, code: str) -> bool:
419450 }
420451
421452 # Use the validated IP directly in the URL to prevent DNS rebinding
422- url = f"https://{ validated_ip } /{ code } .html"
453+ # (bracket IPv6 literals for a valid URL authority)
454+ host = f"[{ validated_ip } ]" if ":" in validated_ip else validated_ip
455+ url = f"https://{ host } /{ code } .html"
423456 token = f"{ prefix } ={ code } "
424457
425458 try :
0 commit comments