Skip to content

Commit 9db8cd0

Browse files
committed
backendcluster: route PD and backend traffic through cluster DNS
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent caad2ab commit 9db8cd0

File tree

22 files changed

+927
-42
lines changed

22 files changed

+927
-42
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ require (
3838
go.uber.org/mock v0.5.2
3939
go.uber.org/ratelimit v0.2.0
4040
go.uber.org/zap v1.27.0
41+
golang.org/x/net v0.48.0
4142
google.golang.org/grpc v1.63.2
4243
)
4344

@@ -272,7 +273,6 @@ require (
272273
golang.org/x/crypto v0.47.0 // indirect
273274
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
274275
golang.org/x/mod v0.31.0 // indirect
275-
golang.org/x/net v0.48.0 // indirect
276276
golang.org/x/oauth2 v0.30.0 // indirect
277277
golang.org/x/sync v0.19.0 // indirect
278278
golang.org/x/sys v0.40.0 // indirect

pkg/balance/observer/health_check.go

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ import (
1919
"go.uber.org/zap"
2020
)
2121

22+
type BackendNetwork interface {
23+
HTTPClient(clusterName string) *http.Client
24+
DialContext(ctx context.Context, network, addr, clusterName string) (net.Conn, error)
25+
}
26+
2227
// HealthCheck is used to check the backends of one backend. One can pass a customized health check function to the observer.
2328
type HealthCheck interface {
2429
Check(ctx context.Context, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
@@ -48,20 +53,44 @@ type security struct {
4853
type DefaultHealthCheck struct {
4954
cfg *config.HealthCheck
5055
logger *zap.Logger
51-
httpCli *http.Client
56+
network BackendNetwork
5257
}
5358

5459
func NewDefaultHealthCheck(httpCli *http.Client, cfg *config.HealthCheck, logger *zap.Logger) *DefaultHealthCheck {
55-
if httpCli == nil {
56-
httpCli = http.NewHTTPClient(func() *tls.Config { return nil })
60+
return NewDefaultHealthCheckWithNetwork(newDefaultBackendNetwork(httpCli), cfg, logger)
61+
}
62+
63+
func NewDefaultHealthCheckWithNetwork(network BackendNetwork, cfg *config.HealthCheck, logger *zap.Logger) *DefaultHealthCheck {
64+
if network == nil {
65+
network = newDefaultBackendNetwork(nil)
5766
}
5867
return &DefaultHealthCheck{
59-
httpCli: httpCli,
68+
network: network,
6069
cfg: cfg,
6170
logger: logger,
6271
}
6372
}
6473

74+
type defaultBackendNetwork struct {
75+
httpCli *http.Client
76+
}
77+
78+
func newDefaultBackendNetwork(httpCli *http.Client) *defaultBackendNetwork {
79+
if httpCli == nil {
80+
httpCli = http.NewHTTPClient(func() *tls.Config { return nil })
81+
}
82+
return &defaultBackendNetwork{httpCli: httpCli}
83+
}
84+
85+
func (n *defaultBackendNetwork) HTTPClient(string) *http.Client {
86+
return n.httpCli
87+
}
88+
89+
func (n *defaultBackendNetwork) DialContext(ctx context.Context, network, addr, _ string) (net.Conn, error) {
90+
var dialer net.Dialer
91+
return dialer.DialContext(ctx, network, addr)
92+
}
93+
6594
func (dhc *DefaultHealthCheck) Check(ctx context.Context, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
6695
bh := &BackendHealth{
6796
BackendInfo: *info,
@@ -96,10 +125,13 @@ func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, info *BackendIn
96125
return
97126
}
98127
addr := info.Addr
128+
clusterName := info.ClusterName
99129
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
100130
err := http.ConnectWithRetry(func() error {
101131
startTime := time.Now()
102-
conn, err := net.DialTimeout("tcp", addr, dhc.cfg.DialTimeout)
132+
dialCtx, cancel := context.WithTimeout(ctx, dhc.cfg.DialTimeout)
133+
conn, err := dhc.network.DialContext(dialCtx, "tcp", addr, clusterName)
134+
cancel()
103135
setPingBackendMetrics(addr, startTime)
104136
if err != nil {
105137
return err
@@ -134,7 +166,8 @@ func (dhc *DefaultHealthCheck) checkStatusPort(ctx context.Context, info *Backen
134166

135167
addr := net.JoinHostPort(info.IP, strconv.Itoa(int(info.StatusPort)))
136168
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
137-
resp, err := dhc.httpCli.Get(addr, statusPathSuffix, b, dhc.cfg.DialTimeout)
169+
clusterName := info.ClusterName
170+
resp, err := dhc.network.HTTPClient(clusterName).Get(addr, statusPathSuffix, b, dhc.cfg.DialTimeout)
138171
if err == nil {
139172
var respBody backendHttpStatusRespBody
140173
err = json.Unmarshal(resp, &respBody)
@@ -176,7 +209,8 @@ func (dhc *DefaultHealthCheck) queryConfig(ctx context.Context, info *BackendInf
176209

177210
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
178211
var resp []byte
179-
if resp, err = dhc.httpCli.Get(addr, configPathSuffix, b, dhc.cfg.DialTimeout); err != nil {
212+
clusterName := info.ClusterName
213+
if resp, err = dhc.network.HTTPClient(clusterName).Get(addr, configPathSuffix, b, dhc.cfg.DialTimeout); err != nil {
180214
return
181215
}
182216
var respBody backendHttpConfigRespBody

pkg/balance/observer/health_check_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package observer
55

66
import (
77
"context"
8+
"crypto/tls"
89
"encoding/json"
910
"net"
1011
"net/http"
1112
"strings"
13+
"sync"
1214
"sync/atomic"
1315
"testing"
1416
"time"
@@ -17,6 +19,7 @@ import (
1719
"github.com/pingcap/tiproxy/lib/util/logger"
1820
"github.com/pingcap/tiproxy/lib/util/waitgroup"
1921
"github.com/pingcap/tiproxy/pkg/testkit"
22+
httputil "github.com/pingcap/tiproxy/pkg/util/http"
2023
"github.com/stretchr/testify/require"
2124
)
2225

@@ -120,6 +123,59 @@ func TestSupportRedirection(t *testing.T) {
120123
require.False(t, health.SupportRedirection)
121124
}
122125

126+
func TestHealthCheckUsesClusterNetwork(t *testing.T) {
127+
lg, _ := logger.CreateLoggerForTest(t)
128+
cfg := newHealthCheckConfigForTest()
129+
backend, info := newBackendServer(t)
130+
defer backend.close()
131+
backend.setServerVersion("1.0")
132+
backend.setHasSigningCert(true)
133+
info.ClusterName = "cluster-a"
134+
135+
network := &mockBackendNetwork{
136+
httpCli: httputil.NewHTTPClient(func() *tls.Config { return nil }),
137+
}
138+
hc := NewDefaultHealthCheckWithNetwork(network, cfg, lg)
139+
health := hc.Check(context.Background(), info, nil)
140+
require.True(t, health.Healthy)
141+
require.Contains(t, network.httpClusters(), "cluster-a")
142+
require.Contains(t, network.dialClusters(), "cluster-a")
143+
}
144+
145+
type mockBackendNetwork struct {
146+
httpCli *httputil.Client
147+
mu sync.Mutex
148+
https []string
149+
dials []string
150+
}
151+
152+
func (n *mockBackendNetwork) HTTPClient(clusterName string) *httputil.Client {
153+
n.mu.Lock()
154+
n.https = append(n.https, clusterName)
155+
n.mu.Unlock()
156+
return n.httpCli
157+
}
158+
159+
func (n *mockBackendNetwork) DialContext(ctx context.Context, network, addr, clusterName string) (net.Conn, error) {
160+
n.mu.Lock()
161+
n.dials = append(n.dials, clusterName)
162+
n.mu.Unlock()
163+
var dialer net.Dialer
164+
return dialer.DialContext(ctx, network, addr)
165+
}
166+
167+
func (n *mockBackendNetwork) httpClusters() []string {
168+
n.mu.Lock()
169+
defer n.mu.Unlock()
170+
return append([]string(nil), n.https...)
171+
}
172+
173+
func (n *mockBackendNetwork) dialClusters() []string {
174+
n.mu.Lock()
175+
defer n.mu.Unlock()
176+
return append([]string(nil), n.dials...)
177+
}
178+
123179
type backendServer struct {
124180
t *testing.T
125181
sqlListener net.Listener

pkg/balance/router/router.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type BackendInst interface {
8080
Healthy() bool
8181
Local() bool
8282
Keyspace() string
83+
ClusterName() string
8384
}
8485

8586
// backendWrapper contains the connections on the backend.

pkg/balance/router/router_static.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (r *StaticRouter) OnConnClosed(backendID, redirectingBackendID string, conn
8282
type StaticBackend struct {
8383
addr string
8484
keyspace string
85+
cluster string
8586
healthy atomic.Bool
8687
}
8788

@@ -120,3 +121,7 @@ func (b *StaticBackend) Keyspace() string {
120121
func (b *StaticBackend) SetKeyspace(k string) {
121122
b.keyspace = k
122123
}
124+
125+
func (b *StaticBackend) ClusterName() string {
126+
return b.cluster
127+
}

pkg/manager/backendcluster/cluster.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ package backendcluster
66
import (
77
"context"
88
"crypto/tls"
9+
"net"
910

1011
"github.com/pingcap/tiproxy/lib/config"
1112
"github.com/pingcap/tiproxy/lib/util/errors"
1213
"github.com/pingcap/tiproxy/pkg/balance/metricsreader"
1314
"github.com/pingcap/tiproxy/pkg/manager/infosync"
1415
"github.com/pingcap/tiproxy/pkg/util/etcd"
15-
"github.com/pingcap/tiproxy/pkg/util/http"
16+
httputil "github.com/pingcap/tiproxy/pkg/util/http"
17+
"github.com/pingcap/tiproxy/pkg/util/netutil"
1618
clientv3 "go.etcd.io/etcd/client/v3"
1719
"go.uber.org/zap"
1820
)
@@ -23,6 +25,8 @@ type Cluster struct {
2325
etcdCli *clientv3.Client
2426
infoSyncer *infosync.InfoSyncer
2527
metrics *metricsreader.ClusterReader
28+
httpCli *httputil.Client
29+
dialer *netutil.DNSDialer
2630
}
2731

2832
func (c *Cluster) Config() config.BackendCluster {
@@ -41,6 +45,14 @@ func (c *Cluster) GetPromInfo(ctx context.Context) (*infosync.PrometheusInfo, er
4145
return c.infoSyncer.GetPromInfo(ctx)
4246
}
4347

48+
func (c *Cluster) HTTPClient() *httputil.Client {
49+
return c.httpCli
50+
}
51+
52+
func (c *Cluster) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
53+
return c.dialer.DialContext(ctx, network, addr)
54+
}
55+
4456
func (c *Cluster) PreClose() {
4557
if c.metrics != nil {
4658
c.metrics.PreClose()
@@ -69,10 +81,18 @@ func NewCluster(
6981
metricsQuerier *MetricsQuerier,
7082
) (*Cluster, error) {
7183
clusterCfg = normalizeCluster(clusterCfg)
72-
etcdCli, err := etcd.InitEtcdClientWithAddrs(
84+
nameServers, err := config.ParseNSServers(clusterCfg.NSServers)
85+
if err != nil {
86+
return nil, err
87+
}
88+
dialer := netutil.NewDNSDialer(nameServers)
89+
httpCli := httputil.NewHTTPClientWithDialContext(clusterTLS, dialer.DialContext)
90+
91+
etcdCli, err := etcd.InitEtcdClientWithAddrsAndDialer(
7392
logger.With(zap.String("cluster", clusterCfg.Name)).Named("etcd"),
7493
clusterCfg.PDAddrs,
7594
clusterTLS(),
95+
dialer,
7696
)
7797
if err != nil {
7898
return nil, err
@@ -91,13 +111,15 @@ func NewCluster(
91111
cfg: clusterCfg,
92112
etcdCli: etcdCli,
93113
infoSyncer: infoSyncer,
114+
httpCli: httpCli,
115+
dialer: dialer,
94116
}
95117
cluster.metrics = metricsreader.NewClusterReader(
96118
logger.With(zap.String("cluster", clusterCfg.Name)).Named("metrics"),
97119
clusterCfg.Name,
98120
cluster,
99121
cluster,
100-
http.NewHTTPClient(clusterTLS),
122+
httpCli,
101123
etcdCli,
102124
config.NewDefaultHealthCheckConfig(),
103125
cfgGetter,

pkg/manager/backendcluster/manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Manager struct {
2727
wg waitgroup.WaitGroup
2828
cancel context.CancelFunc
2929
metrics *MetricsQuerier
30+
network *NetworkRouter
3031

3132
mu struct {
3233
sync.RWMutex
@@ -41,6 +42,7 @@ func NewManager(lg *zap.Logger, clusterTLS func() *tls.Config) *Manager {
4142
}
4243
mgr.mu.clusters = make(map[string]*Cluster)
4344
mgr.metrics = NewMetricsQuerier(mgr)
45+
mgr.network = NewNetworkRouter(mgr, clusterTLS)
4446
return mgr
4547
}
4648

@@ -164,6 +166,7 @@ func clusterReusable(cluster *Cluster, cfg config.BackendCluster) bool {
164166
left.PDAddrs == right.PDAddrs &&
165167
slices.Equal(left.NSServers, right.NSServers)
166168
}
169+
167170
func (m *Manager) Snapshot() map[string]*Cluster {
168171
m.mu.RLock()
169172
snapshot := make(map[string]*Cluster, len(m.mu.clusters))
@@ -182,6 +185,10 @@ func (m *Manager) MetricsQuerier() *MetricsQuerier {
182185
return m.metrics
183186
}
184187

188+
func (m *Manager) NetworkRouter() *NetworkRouter {
189+
return m.network
190+
}
191+
185192
// PrimaryCluster returns the only configured cluster when the cluster count is exactly one.
186193
// It exists for features that are only well-defined in the single-cluster case, such as VIP.
187194
func (m *Manager) PrimaryCluster() *Cluster {

0 commit comments

Comments
 (0)