Skip to content

Commit 30d5b3f

Browse files
authored
backendcluster: add cluster manager and cluster-scoped topology runtime (#1104)
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent 4d841da commit 30d5b3f

32 files changed

+1181
-148
lines changed

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ When adding or modifying features, prefer extending existing packages before cre
4747
- `pkg/manager/config/` - Auto-reloads configuration files and provides interfaces to query them.
4848
- `pkg/manager/elect/` - Manages TiProxy owner elections (for example, metrics reader and VIP modules need an owner).
4949
- `pkg/manager/id/` - Generates global IDs.
50+
- `pkg/manager/backendcluster/` - Manages cluster-scoped backend runtimes and shared resources such as PD or etcd clients.
5051
- `pkg/manager/infosync/` - Queries the topology of TiDB and Prometheus from PD and updates TiProxy information to PD.
5152
- `pkg/manager/logger/` - Manages the logger service.
5253
- `pkg/manager/memory/` - Records heap and goroutine profiles when memory usage is high.

lib/config/proxy.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ func (ps *ProxyServer) Check() error {
282282
return nil
283283
}
284284

285-
func splitAddrList(addrs string) []string {
285+
// SplitAddrList splits a comma-separated address list, trims each address, and drops empty entries.
286+
func SplitAddrList(addrs string) []string {
286287
parts := strings.Split(addrs, ",")
287288
trimmed := make([]string, 0, len(parts))
288289
for _, part := range parts {
@@ -295,7 +296,7 @@ func splitAddrList(addrs string) []string {
295296
}
296297

297298
func validateAddrList(addrs, field string) error {
298-
parts := splitAddrList(addrs)
299+
parts := SplitAddrList(addrs)
299300
if len(parts) == 0 {
300301
return errors.Wrapf(ErrInvalidConfigValue, "%s is empty", field)
301302
}
@@ -345,7 +346,7 @@ func normalizeNSServer(server string) (string, error) {
345346
}
346347

347348
func (ps *ProxyServer) GetSQLAddrs() ([]string, error) {
348-
addrs := splitAddrList(ps.Addr)
349+
addrs := SplitAddrList(ps.Addr)
349350
if len(addrs) == 0 {
350351
if len(ps.PortRange) == 0 {
351352
return []string{""}, nil

pkg/balance/observer/backend_fetcher.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
var _ BackendFetcher = (*PDFetcher)(nil)
1818
var _ BackendFetcher = (*StaticFetcher)(nil)
19+
var _ BackendFetcher = (*FallbackFetcher)(nil)
1920

2021
// BackendFetcher is an interface to fetch the backend list.
2122
type BackendFetcher interface {
@@ -25,6 +26,8 @@ type BackendFetcher interface {
2526
// TopologyFetcher is an interface to fetch the tidb topology from ETCD.
2627
type TopologyFetcher interface {
2728
GetTiDBTopology(ctx context.Context) (map[string]*infosync.TiDBTopologyInfo, error)
29+
// HasBackendClusters reports whether dynamic PD-backed clusters are configured at all.
30+
HasBackendClusters() bool
2831
}
2932

3033
// PDFetcher fetches backend list from PD.
@@ -46,16 +49,41 @@ func NewPDFetcher(tpFetcher TopologyFetcher, logger *zap.Logger, config *config.
4649
func (pf *PDFetcher) GetBackendList(ctx context.Context) (map[string]*BackendInfo, error) {
4750
backends := pf.fetchBackendList(ctx)
4851
infos := make(map[string]*BackendInfo, len(backends))
49-
for addr, backend := range backends {
50-
infos[addr] = &BackendInfo{
51-
Labels: backend.Labels,
52-
IP: backend.IP,
53-
StatusPort: backend.StatusPort,
52+
for key, backend := range backends {
53+
infos[key] = &BackendInfo{
54+
Addr: backend.Addr,
55+
ClusterName: backend.ClusterName,
56+
Labels: backend.Labels,
57+
IP: backend.IP,
58+
StatusPort: backend.StatusPort,
5459
}
5560
}
5661
return infos, nil
5762
}
5863

64+
// FallbackFetcher preserves the legacy behavior that static backend.instances still work before
65+
// any backend cluster is configured, and automatically switches to PD-backed topology afterwards.
66+
type FallbackFetcher struct {
67+
tpFetcher TopologyFetcher
68+
dynamic BackendFetcher
69+
static BackendFetcher
70+
}
71+
72+
func NewFallbackFetcher(tpFetcher TopologyFetcher, dynamic BackendFetcher, static BackendFetcher) *FallbackFetcher {
73+
return &FallbackFetcher{
74+
tpFetcher: tpFetcher,
75+
dynamic: dynamic,
76+
static: static,
77+
}
78+
}
79+
80+
func (ff *FallbackFetcher) GetBackendList(ctx context.Context) (map[string]*BackendInfo, error) {
81+
if !ff.tpFetcher.HasBackendClusters() {
82+
return ff.static.GetBackendList(ctx)
83+
}
84+
return ff.dynamic.GetBackendList(ctx)
85+
}
86+
5987
func (pf *PDFetcher) fetchBackendList(ctx context.Context) map[string]*infosync.TiDBTopologyInfo {
6088
var backends map[string]*infosync.TiDBTopologyInfo
6189
// The jobs of PDFetcher all rely on the topology, so we retry infinitely.
@@ -98,7 +126,7 @@ func (sf *StaticFetcher) GetBackendList(context.Context) (map[string]*BackendInf
98126
func backendListToMap(addrs []string) map[string]*BackendInfo {
99127
backends := make(map[string]*BackendInfo, len(addrs))
100128
for _, addr := range addrs {
101-
backends[addr] = &BackendInfo{}
129+
backends[addr] = &BackendInfo{Addr: addr}
102130
}
103131
return backends
104132
}

pkg/balance/observer/backend_fetcher_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestPDFetcher(t *testing.T) {
2626
{
2727
infos: map[string]*infosync.TiDBTopologyInfo{
2828
"1.1.1.1:4000": {
29+
Addr: "1.1.1.1:4000",
2930
Labels: map[string]string{"k1": "v1"},
3031
IP: "1.1.1.1",
3132
StatusPort: 10080,
@@ -34,6 +35,7 @@ func TestPDFetcher(t *testing.T) {
3435
check: func(m map[string]*BackendInfo) {
3536
require.Len(t, m, 1)
3637
require.NotNil(t, m["1.1.1.1:4000"])
38+
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
3739
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
3840
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
3941
require.Equal(t, map[string]string{"k1": "v1"}, m["1.1.1.1:4000"].Labels)
@@ -42,24 +44,43 @@ func TestPDFetcher(t *testing.T) {
4244
{
4345
infos: map[string]*infosync.TiDBTopologyInfo{
4446
"1.1.1.1:4000": {
47+
Addr: "1.1.1.1:4000",
4548
IP: "1.1.1.1",
4649
StatusPort: 10080,
4750
},
4851
"2.2.2.2:4000": {
52+
Addr: "2.2.2.2:4000",
4953
IP: "2.2.2.2",
5054
StatusPort: 10080,
5155
},
5256
},
5357
check: func(m map[string]*BackendInfo) {
5458
require.Len(t, m, 2)
5559
require.NotNil(t, m["1.1.1.1:4000"])
60+
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
5661
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
5762
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
5863
require.NotNil(t, m["2.2.2.2:4000"])
64+
require.Equal(t, "2.2.2.2:4000", m["2.2.2.2:4000"].Addr)
5965
require.Equal(t, "2.2.2.2", m["2.2.2.2:4000"].IP)
6066
require.Equal(t, uint(10080), m["2.2.2.2:4000"].StatusPort)
6167
},
6268
},
69+
{
70+
infos: map[string]*infosync.TiDBTopologyInfo{
71+
"cluster-a/shared.tidb:4000": {
72+
Addr: "shared.tidb:4000",
73+
IP: "10.0.0.1",
74+
StatusPort: 10080,
75+
},
76+
},
77+
check: func(m map[string]*BackendInfo) {
78+
require.Len(t, m, 1)
79+
require.NotNil(t, m["cluster-a/shared.tidb:4000"])
80+
require.Equal(t, "shared.tidb:4000", m["cluster-a/shared.tidb:4000"].Addr)
81+
require.Equal(t, "10.0.0.1", m["cluster-a/shared.tidb:4000"].IP)
82+
},
83+
},
6384
{
6485
ctx: func() context.Context {
6586
ctx, cancel := context.WithCancel(context.Background())
@@ -85,3 +106,31 @@ func TestPDFetcher(t *testing.T) {
85106
require.NoError(t, err)
86107
}
87108
}
109+
110+
func TestFallbackFetcher(t *testing.T) {
111+
tpFetcher := newMockTpFetcher(t)
112+
lg, _ := logger.CreateLoggerForTest(t)
113+
fetcher := NewFallbackFetcher(
114+
tpFetcher,
115+
NewPDFetcher(tpFetcher, lg, newHealthCheckConfigForTest()),
116+
NewStaticFetcher([]string{"127.0.0.1:4000"}),
117+
)
118+
119+
backends, err := fetcher.GetBackendList(context.Background())
120+
require.NoError(t, err)
121+
require.Len(t, backends, 1)
122+
require.Contains(t, backends, "127.0.0.1:4000")
123+
124+
tpFetcher.hasClusters = true
125+
tpFetcher.infos = map[string]*infosync.TiDBTopologyInfo{
126+
"cluster-a/10.0.0.1:4000": {
127+
Addr: "10.0.0.1:4000",
128+
ClusterName: "cluster-a",
129+
},
130+
}
131+
backends, err = fetcher.GetBackendList(context.Background())
132+
require.NoError(t, err)
133+
require.Len(t, backends, 1)
134+
require.Equal(t, "10.0.0.1:4000", backends["cluster-a/10.0.0.1:4000"].Addr)
135+
require.Equal(t, "cluster-a", backends["cluster-a/10.0.0.1:4000"].ClusterName)
136+
}

pkg/balance/observer/backend_health.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,17 @@ func (bh *BackendHealth) String() string {
7676

7777
// BackendInfo stores the status info of each backend.
7878
type BackendInfo struct {
79-
Labels map[string]string
80-
IP string
81-
StatusPort uint
79+
Addr string
80+
ClusterName string
81+
Labels map[string]string
82+
IP string
83+
StatusPort uint
8284
}
8385

8486
func (bi BackendInfo) Equals(other BackendInfo) bool {
85-
return bi.IP == other.IP &&
87+
return bi.Addr == other.Addr &&
88+
bi.ClusterName == other.ClusterName &&
89+
bi.IP == other.IP &&
8690
bi.StatusPort == other.StatusPort &&
8791
maps.Equal(bi.Labels, other.Labels)
8892
}

pkg/balance/observer/backend_health_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func TestBackendHealthToString(t *testing.T) {
1515
{},
1616
{
1717
BackendInfo: BackendInfo{
18+
Addr: "127.0.0.1:4000",
1819
IP: "127.0.0.1",
1920
StatusPort: 1,
2021
Labels: map[string]string{"k1": "v1", "k2": "v2"},
@@ -45,13 +46,15 @@ func TestBackendHealthEquals(t *testing.T) {
4546
{
4647
a: BackendHealth{
4748
BackendInfo: BackendInfo{
49+
Addr: "127.0.0.1:4000",
4850
IP: "127.0.0.1",
4951
StatusPort: 1,
5052
Labels: map[string]string{"k1": "v1", "k2": "v2"},
5153
},
5254
},
5355
b: BackendHealth{
5456
BackendInfo: BackendInfo{
57+
Addr: "127.0.0.1:4000",
5558
IP: "127.0.0.1",
5659
StatusPort: 1,
5760
},
@@ -61,13 +64,15 @@ func TestBackendHealthEquals(t *testing.T) {
6164
{
6265
a: BackendHealth{
6366
BackendInfo: BackendInfo{
67+
Addr: "127.0.0.1:4000",
6468
IP: "127.0.0.1",
6569
StatusPort: 1,
6670
Labels: map[string]string{"k1": "v1", "k2": "v2"},
6771
},
6872
},
6973
b: BackendHealth{
7074
BackendInfo: BackendInfo{
75+
Addr: "127.0.0.1:4000",
7176
IP: "127.0.0.1",
7277
StatusPort: 1,
7378
Labels: map[string]string{"k1": "v1", "k2": "v2"},
@@ -78,6 +83,7 @@ func TestBackendHealthEquals(t *testing.T) {
7883
{
7984
a: BackendHealth{
8085
BackendInfo: BackendInfo{
86+
Addr: "127.0.0.1:4000",
8187
IP: "127.0.0.1",
8288
StatusPort: 1,
8389
Labels: map[string]string{"k1": "v1", "k2": "v2"},

pkg/balance/observer/backend_observer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (bo *DefaultBackendObserver) checkHealth(ctx context.Context, backends map[
143143
return
144144
}
145145
lastHealth := bo.curBackends[addr]
146-
health := bo.hc.Check(ctx, addr, info, lastHealth)
146+
health := bo.hc.Check(ctx, info, lastHealth)
147147
health.setLocal(cfg)
148148
lock.Lock()
149149
curBackendHealth[addr] = health

pkg/balance/observer/backend_observer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ func (ts *observerTestSuite) addBackend() (string, BackendInfo) {
279279
ts.backendIdx++
280280
addr := fmt.Sprintf("%d", ts.backendIdx)
281281
info := &BackendInfo{
282+
Addr: addr,
282283
IP: "127.0.0.1",
283284
StatusPort: uint(ts.backendIdx),
284285
}

pkg/balance/observer/health_check.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
// HealthCheck is used to check the backends of one backend. One can pass a customized health check function to the observer.
2323
type HealthCheck interface {
24-
Check(ctx context.Context, addr string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
24+
Check(ctx context.Context, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
2525
}
2626

2727
const (
@@ -62,7 +62,7 @@ func NewDefaultHealthCheck(httpCli *http.Client, cfg *config.HealthCheck, logger
6262
}
6363
}
6464

65-
func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
65+
func (dhc *DefaultHealthCheck) Check(ctx context.Context, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
6666
bh := &BackendHealth{
6767
BackendInfo: *info,
6868
Healthy: true,
@@ -80,16 +80,22 @@ func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *Bac
8080
if !bh.Healthy {
8181
return bh
8282
}
83-
dhc.checkSqlPort(ctx, addr, bh)
83+
dhc.checkSqlPort(ctx, info, bh)
8484
if !bh.Healthy {
8585
return bh
8686
}
8787
dhc.queryConfig(ctx, info, bh, lastBh)
8888
return bh
8989
}
9090

91-
func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh *BackendHealth) {
91+
func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, info *BackendInfo, bh *BackendHealth) {
9292
// Also dial the SQL port just in case that the SQL port hangs.
93+
if info == nil || info.Addr == "" {
94+
bh.Healthy = false
95+
bh.PingErr = errors.New("backend address is empty")
96+
return
97+
}
98+
addr := info.Addr
9399
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
94100
err := http.ConnectWithRetry(func() error {
95101
startTime := time.Now()

0 commit comments

Comments
 (0)