Skip to content

Commit 7ae1f88

Browse files
committed
backendcluster: route PD and backend traffic through cluster DNS
1 parent 623fdb8 commit 7ae1f88

File tree

21 files changed

+915
-40
lines changed

21 files changed

+915
-40
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ require (
3838
go.uber.org/mock v0.5.2
3939
go.uber.org/ratelimit v0.2.0
4040
go.uber.org/zap v1.27.0
41+
golang.org/x/net v0.48.0
4142
google.golang.org/grpc v1.63.2
4243
)
4344

@@ -272,7 +273,6 @@ require (
272273
golang.org/x/crypto v0.47.0 // indirect
273274
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
274275
golang.org/x/mod v0.31.0 // indirect
275-
golang.org/x/net v0.48.0 // indirect
276276
golang.org/x/oauth2 v0.30.0 // indirect
277277
golang.org/x/sync v0.19.0 // indirect
278278
golang.org/x/sys v0.40.0 // indirect

pkg/balance/observer/health_check.go

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ import (
1919
"go.uber.org/zap"
2020
)
2121

22+
type BackendNetwork interface {
23+
HTTPClient(clusterName string) *http.Client
24+
DialContext(ctx context.Context, network, addr, clusterName string) (net.Conn, error)
25+
}
26+
2227
// HealthCheck is used to check the backends of one backend. One can pass a customized health check function to the observer.
2328
type HealthCheck interface {
2429
Check(ctx context.Context, backendID string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
@@ -48,20 +53,44 @@ type security struct {
4853
type DefaultHealthCheck struct {
4954
cfg *config.HealthCheck
5055
logger *zap.Logger
51-
httpCli *http.Client
56+
network BackendNetwork
5257
}
5358

5459
func NewDefaultHealthCheck(httpCli *http.Client, cfg *config.HealthCheck, logger *zap.Logger) *DefaultHealthCheck {
55-
if httpCli == nil {
56-
httpCli = http.NewHTTPClient(func() *tls.Config { return nil })
60+
return NewDefaultHealthCheckWithNetwork(newDefaultBackendNetwork(httpCli), cfg, logger)
61+
}
62+
63+
func NewDefaultHealthCheckWithNetwork(network BackendNetwork, cfg *config.HealthCheck, logger *zap.Logger) *DefaultHealthCheck {
64+
if network == nil {
65+
network = newDefaultBackendNetwork(nil)
5766
}
5867
return &DefaultHealthCheck{
59-
httpCli: httpCli,
68+
network: network,
6069
cfg: cfg,
6170
logger: logger,
6271
}
6372
}
6473

74+
type defaultBackendNetwork struct {
75+
httpCli *http.Client
76+
}
77+
78+
func newDefaultBackendNetwork(httpCli *http.Client) *defaultBackendNetwork {
79+
if httpCli == nil {
80+
httpCli = http.NewHTTPClient(func() *tls.Config { return nil })
81+
}
82+
return &defaultBackendNetwork{httpCli: httpCli}
83+
}
84+
85+
func (n *defaultBackendNetwork) HTTPClient(string) *http.Client {
86+
return n.httpCli
87+
}
88+
89+
func (n *defaultBackendNetwork) DialContext(ctx context.Context, network, addr, _ string) (net.Conn, error) {
90+
var dialer net.Dialer
91+
return dialer.DialContext(ctx, network, addr)
92+
}
93+
6594
func (dhc *DefaultHealthCheck) Check(ctx context.Context, _ string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
6695
bh := &BackendHealth{
6796
BackendInfo: *info,
@@ -96,10 +125,13 @@ func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, info *BackendIn
96125
return
97126
}
98127
addr := info.Addr
128+
clusterName := info.ClusterName
99129
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
100130
err := http.ConnectWithRetry(func() error {
101131
startTime := time.Now()
102-
conn, err := net.DialTimeout("tcp", addr, dhc.cfg.DialTimeout)
132+
dialCtx, cancel := context.WithTimeout(ctx, dhc.cfg.DialTimeout)
133+
conn, err := dhc.network.DialContext(dialCtx, "tcp", addr, clusterName)
134+
cancel()
103135
setPingBackendMetrics(addr, startTime)
104136
if err != nil {
105137
return err
@@ -134,7 +166,8 @@ func (dhc *DefaultHealthCheck) checkStatusPort(ctx context.Context, info *Backen
134166

135167
addr := net.JoinHostPort(info.IP, strconv.Itoa(int(info.StatusPort)))
136168
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
137-
resp, err := dhc.httpCli.Get(addr, statusPathSuffix, b, dhc.cfg.DialTimeout)
169+
clusterName := info.ClusterName
170+
resp, err := dhc.network.HTTPClient(clusterName).Get(addr, statusPathSuffix, b, dhc.cfg.DialTimeout)
138171
if err == nil {
139172
var respBody backendHttpStatusRespBody
140173
err = json.Unmarshal(resp, &respBody)
@@ -176,7 +209,8 @@ func (dhc *DefaultHealthCheck) queryConfig(ctx context.Context, info *BackendInf
176209

177210
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
178211
var resp []byte
179-
if resp, err = dhc.httpCli.Get(addr, configPathSuffix, b, dhc.cfg.DialTimeout); err != nil {
212+
clusterName := info.ClusterName
213+
if resp, err = dhc.network.HTTPClient(clusterName).Get(addr, configPathSuffix, b, dhc.cfg.DialTimeout); err != nil {
180214
return
181215
}
182216
var respBody backendHttpConfigRespBody

pkg/balance/observer/health_check_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package observer
55

66
import (
77
"context"
8+
"crypto/tls"
89
"encoding/json"
910
"net"
1011
"net/http"
1112
"strings"
13+
"sync"
1214
"sync/atomic"
1315
"testing"
1416
"time"
@@ -17,6 +19,7 @@ import (
1719
"github.com/pingcap/tiproxy/lib/util/logger"
1820
"github.com/pingcap/tiproxy/lib/util/waitgroup"
1921
"github.com/pingcap/tiproxy/pkg/testkit"
22+
httputil "github.com/pingcap/tiproxy/pkg/util/http"
2023
"github.com/stretchr/testify/require"
2124
)
2225

@@ -120,6 +123,59 @@ func TestSupportRedirection(t *testing.T) {
120123
require.False(t, health.SupportRedirection)
121124
}
122125

126+
func TestHealthCheckUsesClusterNetwork(t *testing.T) {
127+
lg, _ := logger.CreateLoggerForTest(t)
128+
cfg := newHealthCheckConfigForTest()
129+
backend, info := newBackendServer(t)
130+
defer backend.close()
131+
backend.setServerVersion("1.0")
132+
backend.setHasSigningCert(true)
133+
info.ClusterName = "cluster-a"
134+
135+
network := &mockBackendNetwork{
136+
httpCli: httputil.NewHTTPClient(func() *tls.Config { return nil }),
137+
}
138+
hc := NewDefaultHealthCheckWithNetwork(network, cfg, lg)
139+
health := hc.Check(context.Background(), backend.sqlAddr, info, nil)
140+
require.True(t, health.Healthy)
141+
require.Contains(t, network.httpClusters(), "cluster-a")
142+
require.Contains(t, network.dialClusters(), "cluster-a")
143+
}
144+
145+
type mockBackendNetwork struct {
146+
httpCli *httputil.Client
147+
mu sync.Mutex
148+
https []string
149+
dials []string
150+
}
151+
152+
func (n *mockBackendNetwork) HTTPClient(clusterName string) *httputil.Client {
153+
n.mu.Lock()
154+
n.https = append(n.https, clusterName)
155+
n.mu.Unlock()
156+
return n.httpCli
157+
}
158+
159+
func (n *mockBackendNetwork) DialContext(ctx context.Context, network, addr, clusterName string) (net.Conn, error) {
160+
n.mu.Lock()
161+
n.dials = append(n.dials, clusterName)
162+
n.mu.Unlock()
163+
var dialer net.Dialer
164+
return dialer.DialContext(ctx, network, addr)
165+
}
166+
167+
func (n *mockBackendNetwork) httpClusters() []string {
168+
n.mu.Lock()
169+
defer n.mu.Unlock()
170+
return append([]string(nil), n.https...)
171+
}
172+
173+
func (n *mockBackendNetwork) dialClusters() []string {
174+
n.mu.Lock()
175+
defer n.mu.Unlock()
176+
return append([]string(nil), n.dials...)
177+
}
178+
123179
type backendServer struct {
124180
t *testing.T
125181
sqlListener net.Listener

pkg/balance/router/router.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type BackendInst interface {
8080
Healthy() bool
8181
Local() bool
8282
Keyspace() string
83+
ClusterName() string
8384
}
8485

8586
// backendWrapper contains the connections on the backend.

pkg/balance/router/router_static.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (r *StaticRouter) OnConnClosed(backendID, redirectingBackendID string, conn
8282
type StaticBackend struct {
8383
addr string
8484
keyspace string
85+
cluster string
8586
healthy atomic.Bool
8687
}
8788

@@ -120,3 +121,7 @@ func (b *StaticBackend) Keyspace() string {
120121
func (b *StaticBackend) SetKeyspace(k string) {
121122
b.keyspace = k
122123
}
124+
125+
func (b *StaticBackend) ClusterName() string {
126+
return b.cluster
127+
}

pkg/manager/backendcluster/manager.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"crypto/tls"
99
"maps"
10+
"net"
1011
"slices"
1112
"strings"
1213
"sync"
@@ -16,7 +17,8 @@ import (
1617
"github.com/pingcap/tiproxy/pkg/balance/metricsreader"
1718
"github.com/pingcap/tiproxy/pkg/manager/infosync"
1819
"github.com/pingcap/tiproxy/pkg/util/etcd"
19-
"github.com/pingcap/tiproxy/pkg/util/http"
20+
httputil "github.com/pingcap/tiproxy/pkg/util/http"
21+
"github.com/pingcap/tiproxy/pkg/util/netutil"
2022
"github.com/pingcap/tiproxy/pkg/util/waitgroup"
2123
clientv3 "go.etcd.io/etcd/client/v3"
2224
"go.uber.org/zap"
@@ -28,6 +30,8 @@ type Cluster struct {
2830
etcdCli *clientv3.Client
2931
infoSyncer *infosync.InfoSyncer
3032
metrics *metricsreader.ClusterReader
33+
httpCli *httputil.Client
34+
dialer *netutil.DNSDialer
3135
}
3236

3337
func (c *Cluster) Config() config.BackendCluster {
@@ -46,6 +50,14 @@ func (c *Cluster) GetPromInfo(ctx context.Context) (*infosync.PrometheusInfo, er
4650
return c.infoSyncer.GetPromInfo(ctx)
4751
}
4852

53+
func (c *Cluster) HTTPClient() *httputil.Client {
54+
return c.httpCli
55+
}
56+
57+
func (c *Cluster) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
58+
return c.dialer.DialContext(ctx, network, addr)
59+
}
60+
4961
type Manager struct {
5062
lg *zap.Logger
5163
clusterTLS func() *tls.Config
@@ -54,6 +66,7 @@ type Manager struct {
5466
wg waitgroup.WaitGroup
5567
cancel context.CancelFunc
5668
metrics *MetricsQuerier
69+
network *NetworkRouter
5770

5871
mu struct {
5972
sync.RWMutex
@@ -68,6 +81,7 @@ func NewManager(lg *zap.Logger, clusterTLS func() *tls.Config) *Manager {
6881
}
6982
mgr.mu.clusters = make(map[string]*Cluster)
7083
mgr.metrics = NewMetricsQuerier(mgr)
84+
mgr.network = NewNetworkRouter(mgr, clusterTLS)
7185
return mgr
7286
}
7387

@@ -195,10 +209,18 @@ func clusterReusable(cluster *Cluster, cfg config.BackendCluster) bool {
195209

196210
func (m *Manager) buildCluster(ctx context.Context, cfg *config.Config, clusterCfg config.BackendCluster) (*Cluster, error) {
197211
clusterCfg = normalizeCluster(clusterCfg)
198-
etcdCli, err := etcd.InitEtcdClientWithAddrs(
212+
nameServers, err := config.ParseNSServers(clusterCfg.NSServers)
213+
if err != nil {
214+
return nil, err
215+
}
216+
dialer := netutil.NewDNSDialer(nameServers)
217+
httpCli := httputil.NewHTTPClientWithDialContext(m.clusterTLS, dialer.DialContext)
218+
219+
etcdCli, err := etcd.InitEtcdClientWithAddrsAndDialer(
199220
m.lg.With(zap.String("cluster", clusterCfg.Name)).Named("etcd"),
200221
clusterCfg.PDAddrs,
201222
m.clusterTLS(),
223+
dialer,
202224
)
203225
if err != nil {
204226
return nil, err
@@ -217,13 +239,15 @@ func (m *Manager) buildCluster(ctx context.Context, cfg *config.Config, clusterC
217239
cfg: clusterCfg,
218240
etcdCli: etcdCli,
219241
infoSyncer: infoSyncer,
242+
httpCli: httpCli,
243+
dialer: dialer,
220244
}
221245
cluster.metrics = metricsreader.NewClusterReader(
222246
m.lg.With(zap.String("cluster", clusterCfg.Name)).Named("metrics"),
223247
clusterCfg.Name,
224248
cluster,
225249
cluster,
226-
http.NewHTTPClient(m.clusterTLS),
250+
httpCli,
227251
etcdCli,
228252
config.NewDefaultHealthCheckConfig(),
229253
m.cfgGetter,
@@ -277,6 +301,11 @@ func (m *Manager) HasBackendClusters() bool {
277301
func (m *Manager) MetricsQuerier() *MetricsQuerier {
278302
return m.metrics
279303
}
304+
305+
func (m *Manager) NetworkRouter() *NetworkRouter {
306+
return m.network
307+
}
308+
280309
// PrimaryCluster returns the only configured cluster when the cluster count is exactly one.
281310
// It exists for features that are only well-defined in the single-cluster case, such as VIP.
282311
func (m *Manager) PrimaryCluster() *Cluster {

0 commit comments

Comments
 (0)