Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
if err == nil {
t.Fatal("EmptyCall() succeeded when expected to fail")
}
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") {
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "no targets to pick from") {
break
}
}
Expand Down
135 changes: 135 additions & 0 deletions internal/xds/balancer/cdsbalancer/e2e_test/dns_impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2026 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e_test

import (
"context"
"fmt"
"testing"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
testpb "google.golang.org/grpc/interop/grpc_testing"

_ "google.golang.org/grpc/internal/xds/balancer/clusterresolver"
)

// TestLogicalDNS_MultipleEndpoints tests the cluster_resolver LB policy
// using a LOGICAL_DNS discovery mechanism.
//
// The test verifies that multiple addresses returned by the DNS resolver are
// grouped into a single endpoint (as per gRFC A61). Because the round_robin
// LB policy sees only one endpoint, it should not rotate traffic between the
// addresses. Instead, the single endpoint is picked, and connects to the
// first address.
func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) {
// Spin up a management server to receive xDS resources from.
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
resolverBuilder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}

// Start backend servers which provide an implementation of the TestService.
server1 := stubserver.StartTestService(t, nil)
defer server1.Stop()
server2 := stubserver.StartTestService(t, nil)
defer server2.Stop()

// Register a manual resolver with the "dns" scheme to override DNS resolution.
// This global override is safe because connection to the xDS management
// server uses the passthrough scheme instead and therefore overriding
// the DNS resolver does not affect it in any way.
const dnsScheme = "dns"
dnsR := manual.NewBuilderWithScheme(dnsScheme)
originalDNS := resolver.Get("dns")
resolver.Register(dnsR)
t.Cleanup(func() { resolver.Register(originalDNS) })

// For LOGICAL_DNS, this updates the SINGLE endpoint to have 2 IPs.
dnsR.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{{
Addresses: []resolver.Address{
{Addr: server1.Address},
{Addr: server2.Address},
}}},
})

const (
serviceName = "test-xds-service"
clusterName = "cluster-test-xds-service"
endpointsName = "endpoints-test-xds-service"
rdsName = "route-test-xds-service"
)

resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, clusterName)},
Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: endpointsName,
Type: e2e.ClusterTypeLogicalDNS,
DNSHostName: "dns",
DNSPort: uint32(8080),
Policy: e2e.LoadBalancingPolicyRoundRobin,
})},
Endpoints: nil,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

if err := managementServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server: %v", err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.NewClient(fmt.Sprintf("xds:///"+serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
if err != nil {
t.Fatalf("Failed to create new client for local test server: %v", err)
}
defer cc.Close()

// Ensure the RPC is routed to the first backend.
testClient := testpb.NewTestServiceClient(cc)
for i := 0; i < 10; i++ {
var peer peer.Peer
if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
t.Fatalf("RPC failed: %v", err)
}

if got, want := peer.Addr.String(), server1.Address; got != want {
t.Errorf("peer.Addr = %q, want = %q", got, want)
}
}
}
9 changes: 2 additions & 7 deletions internal/xds/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,14 +989,9 @@ func (s) TestReResolutionAfterTransientFailure(t *testing.T) {
}

// Stopping the server listener will close the transport on the client,
// which will lead to the channel eventually moving to IDLE.
// which will lead to the channel eventually moving to TRANSIENT_FAILURE.
lis.Stop()
testutils.AwaitState(ctx, t, conn, connectivity.Idle)

// An RPC at this point is expected to fail with TRANSIENT_FAILURE.
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
t.Fatalf("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE, got %v want %v", err, codes.Unavailable)
}
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)

// Expect resolver's ResolveNow to be called due to TF state.
select {
Expand Down
44 changes: 27 additions & 17 deletions internal/xds/balancer/clusterresolver/configbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/xds/balancer/outlierdetection"
"google.golang.org/grpc/internal/xds/balancer/priority"
"google.golang.org/grpc/internal/xds/balancer/wrrlocality"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/resolver"
)
Expand Down Expand Up @@ -108,7 +109,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi
}
continue
case DiscoveryMechanismTypeLogicalDNS:
name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism)
name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism, xdsLBPolicy)
retConfig.Priorities = append(retConfig.Priorities, name)
retEndpoints = append(retEndpoints, endpoints...)
odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection)
Expand Down Expand Up @@ -138,27 +139,36 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out
return &odCfgRet
}

func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) {
// Endpoint picking policy for DNS is hardcoded to pick_first.
const childPolicy = "pick_first"
retEndpoints := make([]resolver.Endpoint, len(endpoints))
func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) (string, *clusterimpl.LBConfig, []resolver.Endpoint) {
pName := fmt.Sprintf("priority-%v", g.prefix)
for i, e := range endpoints {
// For Logical DNS clusters, the same hostname attribute is added
// to all endpoints. It is set to the name that is resolved for the
// Logical DNS cluster, including the port number.
retEndpoints[i] = xdsresource.SetHostname(hierarchy.SetInEndpoint(e, []string{pName}), mechanism.DNSHostname)
// Copy the nested address field as slice fields are shared by the
// iteration variable and the original slice.
retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...)
}
return pName, &clusterimpl.LBConfig{
lbconfig := &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
TelemetryLabels: mechanism.TelemetryLabels,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
ChildPolicy: xdsLBPolicy,
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
LoadReportingServer: mechanism.LoadReportingServer,
}, retEndpoints
}
if len(endpoints) == 0 {
return pName, lbconfig, nil
}
var retEndpoint resolver.Endpoint
for _, e := range endpoints {
// LOGICAL_DNS requires all resolved addresses to be grouped into a
// single logical endpoint. We iterate over the input endpoints and
// aggregate their addresses into a new endpoint variable.
retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...)
}
// Even though localities are not a thing for the LOGICAL_DNS cluster and
// its endpoint(s), we add an empty locality attribute here to ensure that
// LB policies that rely on locality information (like weighted_target)
// continue to work.
localityStr := xdsinternal.LocalityString(clients.Locality{})
retEndpoint = xdsresource.SetHostname(hierarchy.SetInEndpoint(retEndpoint, []string{pName, localityStr}), mechanism.DNSHostname)
// Set the locality weight to 1. This is required because the child policy
// like weighted_target which relies on locality weights to distribute
// traffic. These policies may drop traffic if the weight is 0.
retEndpoint = wrrlocality.SetAddrInfo(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: 1})
return pName, lbconfig, []resolver.Endpoint{retEndpoint}
}

// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
Expand Down
Loading