Skip to content

ClickHouse Sink DNS Auto-Resolution for Load Balancing #23877

@sebinsunny

Description

@sebinsunny

A note for the community

Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

Add support for automatic DNS resolution in the ClickHouse sink to handle hostnames that resolve to multiple IP addresses for better load balancing.

Right now, Vector sinks only use .next() to pick the first IP address from DNS lookups. This causes issues like:

Poor load distribution across ClickHouse instances, where data inserts only go to the first resolved IP.

Proposed Changes

Introduce a new auto_resolve_dns option for the ClickHouse sink that:

Resolves hostnames to all available IPs (e.g., clickhouse.example.com → [10.0.1.100, 10.0.1.101, 10.0.1.102]).
Reuses the distributed_service logic (similar to the Elasticsearch sink) to balance load across all resolved IPs.
Keeps authentication credentials intact in the resolved URLs.
Adds health checks for all resolved IPs to ensure reliability.
Defaults to false to maintain backward compatibility.

Attempted Solutions

Code Implementation

Configuration Option

/// Automatically resolve hostnames to all available IP addresses.
///
/// When enabled, the hostname in the endpoint will be resolved to all its IP addresses,
/// and Vector will load balance across all resolved IPs.
#[serde(default)]
pub auto_resolve_dns: bool,

DNS Resolution Logic

/// Resolve a single endpoint URL to all its IP addresses
async fn resolve_endpoint_to_ips(endpoint_str: &str) -> crate::Result<Vec<String>> {
    let uri: Uri = endpoint_str.parse().map_err(|e| format!("Invalid URI: {}", e))?;
    
    let host = uri.host().ok_or("URI must contain a host")?;
    let port = uri.port_u16().unwrap_or(match uri.scheme_str() {
        Some("https") => 443,
        Some("http") => 80,
        _ => return Err("URI must have http or https scheme".into()),
    });

    // Resolve hostname to all IP addresses
    let ips: Vec<_> = dns::Resolver
        .lookup_ip(host.to_string())
        .await
        .map_err(|e| format!("DNS resolution failed for {}: {}", host, e))?
        .collect();

    // Create new endpoint URLs with resolved IPs
    let mut resolved_endpoints = Vec::new();
    for ip in ips {
        let mut new_uri_parts = uri.clone().into_parts();
        
        // Replace the host with the IP address
        let authority = if port == 80 && uri.scheme_str() == Some("http")
            || port == 443 && uri.scheme_str() == Some("https") {
            format!("{}", ip)  // Default ports
        } else {
            format!("{}:{}", ip, port)  // Non-default ports
        };

        // Preserve userinfo (username:password) if present
        let authority = if let Some(original_authority) = uri.authority() {
            let authority_str = original_authority.as_str();
            if let Some(at_pos) = authority_str.find('@') {
                let userinfo = &authority_str[..at_pos];
                format!("{}@{}", userinfo, authority)
            } else {
                authority
            }
        } else {
            authority
        };

        new_uri_parts.authority = Some(authority.parse().map_err(|e| format!("Invalid authority: {}", e))?);
        let new_uri = Uri::from_parts(new_uri_parts).map_err(|e| format!("Failed to construct URI: {}", e))?;
        resolved_endpoints.push(new_uri.to_string());
    }

    Ok(resolved_endpoints)
}

Load Balancing Setup

let services = commons
    .iter()
    .cloned()
    .map(|common| {
        let endpoint = common.endpoint.to_string();
        let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
            HttpService::new(client.clone(), common.service_request_builder);
        (endpoint, service)
    })
    .collect::<Vec<_>>();

let service = request_limits.distributed_service(
    ClickhouseRetryLogic::default(),
    services,
    Default::default(), // Use default health config
    ClickhouseHealthLogic,
    1,
);

Health Monitoring

let healthcheck = futures::future::select_ok(
    commons
        .into_iter()
        .map(move |common| common.healthcheck(client.clone()).boxed()),
)
.map_ok(|((), _)| ())
.boxed();

Proposal

No response

References

No response

Version

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    sink: clickhouseAnything `clickhouse` sink relatedtype: featureA value-adding code addition that introduce new functionality.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions