Skip to content

Commit 14e5c8f

Browse files
committed
resolving comments
1 parent c68ffd6 commit 14e5c8f

File tree

3 files changed

+245
-28
lines changed

3 files changed

+245
-28
lines changed

internal/xds/balancer/clusterresolver/configbuilder.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,18 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out
140140
}
141141

142142
func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) (string, *clusterimpl.LBConfig, []resolver.Endpoint) {
143-
retEndpoints := make([]resolver.Endpoint, len(endpoints))
143+
var retEndpoints []resolver.Endpoint
144144
pName := fmt.Sprintf("priority-%v", g.prefix)
145-
for i, e := range endpoints {
146-
// Use the canonical string representation for the locality to match
147-
// the keys expected by the parent Load Balancing policy.
145+
if len(endpoints) >= 1 {
146+
retEndpoints = make([]resolver.Endpoint, 1)
147+
for _, e := range endpoints {
148+
// Copy the nested address field as slice fields are shared by the
149+
// iteration variable and the original slice.
150+
retEndpoints[0].Addresses = append(retEndpoints[0].Addresses, e.Addresses...)
151+
}
148152
localityStr := xdsinternal.LocalityString(clients.Locality{})
149-
retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName, localityStr})
150-
retEndpoints[i] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[i], wrrlocality.AddrInfo{LocalityWeight: 1})
151-
// Copy the nested address field as slice fields are shared by the
152-
// iteration variable and the original slice.
153-
retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...)
153+
retEndpoints[0] = hierarchy.SetInEndpoint(retEndpoints[0], []string{pName, localityStr})
154+
retEndpoints[0] = wrrlocality.SetAddrInfoInEndpoint(retEndpoints[0], wrrlocality.AddrInfo{LocalityWeight: 1})
154155
}
155156
return pName, &clusterimpl.LBConfig{
156157
Cluster: mechanism.Cluster,

internal/xds/balancer/clusterresolver/configbuilder_test.go

Lines changed: 80 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -305,35 +305,96 @@ func TestBuildPriorityConfig(t *testing.T) {
305305
}
306306
}
307307

308-
func testEndpointForDNS(addrStrs []string, localityWeight uint32, path []string) resolver.Endpoint {
309-
endpoint := resolver.Endpoint{}
310-
endpoint.Addresses = append(endpoint.Addresses, resolver.Address{Addr: addrStrs[0]})
311-
endpoint = hierarchy.SetInEndpoint(endpoint, path)
312-
endpoint = wrrlocality.SetAddrInfoInEndpoint(endpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight})
313-
return endpoint
308+
func testEndpointForDNS(endpoint []resolver.Endpoint, localityWeight uint32, path []string) resolver.Endpoint {
309+
retEndpoint := resolver.Endpoint{}
310+
for _, e := range endpoint {
311+
retEndpoint.Addresses = append(retEndpoint.Addresses, e.Addresses...)
312+
}
313+
retEndpoint = hierarchy.SetInEndpoint(retEndpoint, path)
314+
retEndpoint = wrrlocality.SetAddrInfoInEndpoint(retEndpoint, wrrlocality.AddrInfo{LocalityWeight: localityWeight})
315+
return retEndpoint
314316
}
315317

316318
func TestBuildClusterImplConfigForDNS(t *testing.T) {
317-
gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), testResolverEndpoints[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil)
318319
wantName := "priority-3"
319320
localityStr := xdsinternal.LocalityString(clients.Locality{})
320321
wantConfig := &clusterimpl.LBConfig{
321322
Cluster: testClusterName2,
322323
ChildPolicy: nil,
323324
}
324-
wantEndpoints := []resolver.Endpoint{
325-
testEndpointForDNS(testEndpoints[0][0].Addresses, 1, []string{wantName, localityStr}),
326-
testEndpointForDNS(testEndpoints[0][1].Addresses, 1, []string{wantName, localityStr}),
327-
}
325+
for _, tt := range []struct {
326+
name string
327+
endpoint []resolver.Endpoint
328+
wantEndpoint []resolver.Endpoint
329+
}{
330+
{
331+
name: "one endpoint with one address",
332+
endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}},
333+
wantEndpoint: []resolver.Endpoint{testEndpointForDNS([]resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "addr-0-0"}}}}, 1, []string{wantName, localityStr})},
334+
},
335+
{
336+
name: "one endpoint with multiple addresses",
337+
endpoint: []resolver.Endpoint{{Addresses: []resolver.Address{
338+
{Addr: "addr-0-0"},
339+
{Addr: "addr-0-1"},
340+
}}},
341+
wantEndpoint: []resolver.Endpoint{
342+
testEndpointForDNS([]resolver.Endpoint{{Addresses: []resolver.Address{
343+
{Addr: "addr-0-0"},
344+
{Addr: "addr-0-1"},
345+
}}}, 1, []string{wantName, localityStr}),
346+
},
347+
},
348+
{
349+
name: "multiple endpoints, all with one address each",
350+
endpoint: []resolver.Endpoint{
351+
{Addresses: []resolver.Address{{Addr: "addr-0-0"}}},
352+
{Addresses: []resolver.Address{{Addr: "addr-0-1"}}},
353+
},
354+
wantEndpoint: []resolver.Endpoint{
355+
testEndpointForDNS([]resolver.Endpoint{
356+
{Addresses: []resolver.Address{{Addr: "addr-0-0"}}},
357+
{Addresses: []resolver.Address{{Addr: "addr-0-1"}}},
358+
}, 1, []string{wantName, localityStr}),
359+
},
360+
},
361+
{
362+
name: "multiple endpoints, all with multiple addresses",
363+
endpoint: []resolver.Endpoint{
364+
{Addresses: []resolver.Address{
365+
{Addr: "addr-0-0"},
366+
{Addr: "addr-0-1"},
367+
}},
368+
{Addresses: []resolver.Address{
369+
{Addr: "addr-1-0"},
370+
{Addr: "addr-1-1"},
371+
}},
372+
},
373+
wantEndpoint: []resolver.Endpoint{
374+
testEndpointForDNS([]resolver.Endpoint{
375+
{Addresses: []resolver.Address{
376+
{Addr: "addr-0-0"},
377+
{Addr: "addr-0-1"},
378+
}},
379+
{Addresses: []resolver.Address{
380+
{Addr: "addr-1-0"},
381+
{Addr: "addr-1-1"},
382+
}},
383+
}, 1, []string{wantName, localityStr}),
384+
},
385+
},
386+
} {
387+
gotName, gotConfig, gotEndpoints := buildClusterImplConfigForDNS(newNameGenerator(3), tt.endpoint, DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS}, nil)
328388

329-
if diff := cmp.Diff(gotName, wantName); diff != "" {
330-
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
331-
}
332-
if diff := cmp.Diff(gotConfig, wantConfig); diff != "" {
333-
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
334-
}
335-
if diff := cmp.Diff(gotEndpoints, wantEndpoints, endpointCmpOpts); diff != "" {
336-
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
389+
if diff := cmp.Diff(gotName, wantName); diff != "" {
390+
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
391+
}
392+
if diff := cmp.Diff(gotConfig, wantConfig); diff != "" {
393+
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
394+
}
395+
if diff := cmp.Diff(gotEndpoints, tt.wantEndpoint, endpointCmpOpts); diff != "" {
396+
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
397+
}
337398
}
338399
}
339400

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright 2025 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package e2e_test
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
"time"
24+
25+
"github.com/google/uuid"
26+
"google.golang.org/grpc"
27+
"google.golang.org/grpc/credentials/insecure"
28+
"google.golang.org/grpc/internal"
29+
"google.golang.org/grpc/internal/stubserver"
30+
"google.golang.org/grpc/internal/testutils/xds/e2e"
31+
"google.golang.org/grpc/internal/xds/bootstrap"
32+
"google.golang.org/grpc/internal/xds/xdsclient"
33+
"google.golang.org/grpc/peer"
34+
"google.golang.org/grpc/resolver"
35+
"google.golang.org/grpc/resolver/manual"
36+
"google.golang.org/grpc/serviceconfig"
37+
38+
testpb "google.golang.org/grpc/interop/grpc_testing"
39+
40+
_ "google.golang.org/grpc/internal/xds/balancer/clusterresolver"
41+
)
42+
43+
// TestLogicalDNS_MultipleEndpoints tests the cluster_resolver LB policy
44+
// using a LOGICAL_DNS discovery mechanism.
45+
//
46+
// The test verifies that multiple addresses returned by the DNS resolver are
47+
// grouped into a single endpoint (as per gRFC A61). Because the round_robin
48+
// LB policy (configured via xdsLbPolicy) sees only one endpoint, it should
49+
// not rotate traffic between the addresses. Instead, the single endpoint
50+
// (which contains all addresses) is picked, and connects to the first address.
51+
func (s) TestLogicalDNS_MultipleEndpoints(t *testing.T) {
52+
// Spin up a management server to receive xDS resources from.
53+
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
54+
55+
// Create bootstrap configuration pointing to the above management server.
56+
nodeID := uuid.New().String()
57+
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
58+
59+
// Start backend servers which provide an implementation of the TestService.
60+
server1 := stubserver.StartTestService(t, nil)
61+
defer server1.Stop()
62+
server2 := stubserver.StartTestService(t, nil)
63+
defer server2.Stop()
64+
65+
// Mock the DNS Resolver
66+
const dnsScheme = "dns"
67+
dnsR := manual.NewBuilderWithScheme(dnsScheme)
68+
originalDNS := resolver.Get("dns")
69+
resolver.Register(dnsR)
70+
t.Cleanup(func() { resolver.Register(originalDNS) })
71+
72+
// Capture the ClientConn created by the cluster_resolver so we can push updates.
73+
// We use a channel to synchronize access and avoid race conditions.
74+
dnsCCCh := make(chan resolver.ClientConn, 1)
75+
dnsR.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
76+
select {
77+
case dnsCCCh <- cc:
78+
default:
79+
}
80+
}
81+
82+
// Create an xDS client for use by the cluster_resolver LB policy.
83+
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
84+
if err != nil {
85+
t.Fatalf("Failed to parse bootstrap: %v", err)
86+
}
87+
pool := xdsclient.NewPool(config)
88+
xdsC, closeClient, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
89+
Name: t.Name(),
90+
})
91+
if err != nil {
92+
t.Fatalf("Failed to create xDS client: %v", err)
93+
}
94+
defer closeClient()
95+
96+
// Create a manual resolver and push service config specifying the use of
97+
// the cluster_resolver LB policy with LOGICAL_DNS discovery mechanism.
98+
r := manual.NewBuilderWithScheme("whatever")
99+
jsonSC := fmt.Sprintf(`{
100+
"loadBalancingConfig":[{
101+
"cluster_resolver_experimental":{
102+
"discoveryMechanisms": [{
103+
"cluster": "test-cluster",
104+
"type": "LOGICAL_DNS",
105+
"dnsHostname": "%s:///target-name",
106+
"outlierDetection": {}
107+
}],
108+
"xdsLbPolicy":[{"round_robin":{}}]
109+
}
110+
}]
111+
}`, dnsScheme)
112+
113+
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
114+
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC))
115+
116+
// Create a ClientConn and make a successful RPC.
117+
cc, err := grpc.NewClient(r.Scheme()+":///test.service",
118+
grpc.WithTransportCredentials(insecure.NewCredentials()),
119+
grpc.WithResolvers(r),
120+
)
121+
cc.Connect()
122+
if err != nil {
123+
t.Fatalf("failed to create new client for local test server: %v", err)
124+
}
125+
defer cc.Close()
126+
127+
testClient := testpb.NewTestServiceClient(cc)
128+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
129+
defer cancel()
130+
131+
var dnsClientConn resolver.ClientConn
132+
select {
133+
case dnsClientConn = <-dnsCCCh:
134+
case <-ctx.Done():
135+
t.Fatal("Timeout waiting for cluster_resolver to build the DNS resolver")
136+
}
137+
138+
// For LOGICAL_DNS, this updates the SINGLE endpoint to have 2 IPs.
139+
dnsClientConn.UpdateState(resolver.State{
140+
Addresses: []resolver.Address{
141+
{Addr: server1.Address},
142+
{Addr: server2.Address},
143+
},
144+
})
145+
146+
// Ensure the RPC is routed to the first backend.
147+
var peer peer.Peer
148+
if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
149+
t.Fatalf("RPC failed: %v", err)
150+
}
151+
152+
if got, want := peer.Addr.String(), server1.Address; got != want {
153+
t.Errorf("peer.Addr = %q, want = %q", got, want)
154+
}
155+
}

0 commit comments

Comments
 (0)