Skip to content

Commit 1add4f8

Browse files
committed
backendcluster: add cluster manager and cluster-scoped topology runtime
Introduce the backend cluster manager, cluster-scoped InfoSync runtime, topology aggregation, and single-cluster compatibility hooks.
1 parent 29264bd commit 1add4f8

27 files changed

+994
-133
lines changed

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ When adding or modifying features, prefer extending existing packages before cre
4747
- `pkg/manager/config/` - Auto-reloads configuration files and provides interfaces to query them.
4848
- `pkg/manager/elect/` - Manages TiProxy owner elections (for example, metrics reader and VIP modules need an owner).
4949
- `pkg/manager/id/` - Generates global IDs.
50+
- `pkg/manager/backendcluster/` - Manages cluster-scoped backend runtimes and shared resources such as PD or etcd clients.
5051
- `pkg/manager/infosync/` - Queries the topology of TiDB and Prometheus from PD and updates TiProxy information to PD.
5152
- `pkg/manager/logger/` - Manages the logger service.
5253
- `pkg/manager/memory/` - Records heap and goroutine profiles when memory usage is high.

pkg/balance/observer/backend_fetcher.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,46 @@ type BackendFetcher interface {
2525
// TopologyFetcher is an interface to fetch the tidb topology from ETCD.
2626
type TopologyFetcher interface {
2727
GetTiDBTopology(ctx context.Context) (map[string]*infosync.TiDBTopologyInfo, error)
28+
// HasBackendClusters reports whether dynamic PD-backed clusters are configured at all.
29+
// PDFetcher uses it to preserve the legacy behavior that static backend.instances still work
30+
// when TiProxy starts without any PD cluster and clusters are added later through the API.
31+
HasBackendClusters() bool
2832
}
2933

3034
// PDFetcher fetches backend list from PD.
3135
type PDFetcher struct {
3236
tpFetcher TopologyFetcher
3337
logger *zap.Logger
3438
config *config.HealthCheck
39+
static *StaticFetcher
3540
}
3641

37-
func NewPDFetcher(tpFetcher TopologyFetcher, logger *zap.Logger, config *config.HealthCheck) *PDFetcher {
42+
func NewPDFetcher(tpFetcher TopologyFetcher, staticAddrs []string, logger *zap.Logger, config *config.HealthCheck) *PDFetcher {
3843
config.Check()
3944
return &PDFetcher{
4045
tpFetcher: tpFetcher,
4146
logger: logger,
4247
config: config,
48+
static: NewStaticFetcher(staticAddrs),
4349
}
4450
}
4551

4652
func (pf *PDFetcher) GetBackendList(ctx context.Context) (map[string]*BackendInfo, error) {
53+
// Keep backward compatibility with the legacy static-namespace flow: before any backend cluster
54+
// is configured, backend.instances must still be routable even though namespace now always sees
55+
// a non-nil topology fetcher from the cluster manager.
56+
if !pf.tpFetcher.HasBackendClusters() {
57+
return pf.static.GetBackendList(ctx)
58+
}
4759
backends := pf.fetchBackendList(ctx)
4860
infos := make(map[string]*BackendInfo, len(backends))
49-
for addr, backend := range backends {
50-
infos[addr] = &BackendInfo{
51-
Labels: backend.Labels,
52-
IP: backend.IP,
53-
StatusPort: backend.StatusPort,
61+
for key, backend := range backends {
62+
infos[key] = &BackendInfo{
63+
Addr: backend.Addr,
64+
ClusterName: backend.ClusterName,
65+
Labels: backend.Labels,
66+
IP: backend.IP,
67+
StatusPort: backend.StatusPort,
5468
}
5569
}
5670
return infos, nil
@@ -98,7 +112,7 @@ func (sf *StaticFetcher) GetBackendList(context.Context) (map[string]*BackendInf
98112
func backendListToMap(addrs []string) map[string]*BackendInfo {
99113
backends := make(map[string]*BackendInfo, len(addrs))
100114
for _, addr := range addrs {
101-
backends[addr] = &BackendInfo{}
115+
backends[addr] = &BackendInfo{Addr: addr}
102116
}
103117
return backends
104118
}

pkg/balance/observer/backend_fetcher_test.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestPDFetcher(t *testing.T) {
2626
{
2727
infos: map[string]*infosync.TiDBTopologyInfo{
2828
"1.1.1.1:4000": {
29+
Addr: "1.1.1.1:4000",
2930
Labels: map[string]string{"k1": "v1"},
3031
IP: "1.1.1.1",
3132
StatusPort: 10080,
@@ -34,6 +35,7 @@ func TestPDFetcher(t *testing.T) {
3435
check: func(m map[string]*BackendInfo) {
3536
require.Len(t, m, 1)
3637
require.NotNil(t, m["1.1.1.1:4000"])
38+
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
3739
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
3840
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
3941
require.Equal(t, map[string]string{"k1": "v1"}, m["1.1.1.1:4000"].Labels)
@@ -42,24 +44,43 @@ func TestPDFetcher(t *testing.T) {
4244
{
4345
infos: map[string]*infosync.TiDBTopologyInfo{
4446
"1.1.1.1:4000": {
47+
Addr: "1.1.1.1:4000",
4548
IP: "1.1.1.1",
4649
StatusPort: 10080,
4750
},
4851
"2.2.2.2:4000": {
52+
Addr: "2.2.2.2:4000",
4953
IP: "2.2.2.2",
5054
StatusPort: 10080,
5155
},
5256
},
5357
check: func(m map[string]*BackendInfo) {
5458
require.Len(t, m, 2)
5559
require.NotNil(t, m["1.1.1.1:4000"])
60+
require.Equal(t, "1.1.1.1:4000", m["1.1.1.1:4000"].Addr)
5661
require.Equal(t, "1.1.1.1", m["1.1.1.1:4000"].IP)
5762
require.Equal(t, uint(10080), m["1.1.1.1:4000"].StatusPort)
5863
require.NotNil(t, m["2.2.2.2:4000"])
64+
require.Equal(t, "2.2.2.2:4000", m["2.2.2.2:4000"].Addr)
5965
require.Equal(t, "2.2.2.2", m["2.2.2.2:4000"].IP)
6066
require.Equal(t, uint(10080), m["2.2.2.2:4000"].StatusPort)
6167
},
6268
},
69+
{
70+
infos: map[string]*infosync.TiDBTopologyInfo{
71+
"cluster-a/shared.tidb:4000": {
72+
Addr: "shared.tidb:4000",
73+
IP: "10.0.0.1",
74+
StatusPort: 10080,
75+
},
76+
},
77+
check: func(m map[string]*BackendInfo) {
78+
require.Len(t, m, 1)
79+
require.NotNil(t, m["cluster-a/shared.tidb:4000"])
80+
require.Equal(t, "shared.tidb:4000", m["cluster-a/shared.tidb:4000"].Addr)
81+
require.Equal(t, "10.0.0.1", m["cluster-a/shared.tidb:4000"].IP)
82+
},
83+
},
6384
{
6485
ctx: func() context.Context {
6586
ctx, cancel := context.WithCancel(context.Background())
@@ -74,9 +95,10 @@ func TestPDFetcher(t *testing.T) {
7495

7596
tpFetcher := newMockTpFetcher(t)
7697
lg, _ := logger.CreateLoggerForTest(t)
77-
pf := NewPDFetcher(tpFetcher, lg, newHealthCheckConfigForTest())
98+
pf := NewPDFetcher(tpFetcher, nil, lg, newHealthCheckConfigForTest())
7899
for _, test := range tests {
79100
tpFetcher.infos = test.infos
101+
tpFetcher.hasClusters = true
80102
if test.ctx == nil {
81103
test.ctx = context.Background()
82104
}
@@ -85,3 +107,27 @@ func TestPDFetcher(t *testing.T) {
85107
require.NoError(t, err)
86108
}
87109
}
110+
111+
func TestPDFetcherFallbackToStaticWithoutBackendClusters(t *testing.T) {
112+
tpFetcher := newMockTpFetcher(t)
113+
lg, _ := logger.CreateLoggerForTest(t)
114+
fetcher := NewPDFetcher(tpFetcher, []string{"127.0.0.1:4000"}, lg, newHealthCheckConfigForTest())
115+
116+
backends, err := fetcher.GetBackendList(context.Background())
117+
require.NoError(t, err)
118+
require.Len(t, backends, 1)
119+
require.Contains(t, backends, "127.0.0.1:4000")
120+
121+
tpFetcher.hasClusters = true
122+
tpFetcher.infos = map[string]*infosync.TiDBTopologyInfo{
123+
"cluster-a/10.0.0.1:4000": {
124+
Addr: "10.0.0.1:4000",
125+
ClusterName: "cluster-a",
126+
},
127+
}
128+
backends, err = fetcher.GetBackendList(context.Background())
129+
require.NoError(t, err)
130+
require.Len(t, backends, 1)
131+
require.Equal(t, "10.0.0.1:4000", backends["cluster-a/10.0.0.1:4000"].Addr)
132+
require.Equal(t, "cluster-a", backends["cluster-a/10.0.0.1:4000"].ClusterName)
133+
}

pkg/balance/observer/backend_health.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,17 @@ func (bh *BackendHealth) String() string {
7676

7777
// BackendInfo stores the status info of each backend.
7878
type BackendInfo struct {
79-
Labels map[string]string
80-
IP string
81-
StatusPort uint
79+
Addr string
80+
ClusterName string
81+
Labels map[string]string
82+
IP string
83+
StatusPort uint
8284
}
8385

8486
func (bi BackendInfo) Equals(other BackendInfo) bool {
85-
return bi.IP == other.IP &&
87+
return bi.Addr == other.Addr &&
88+
bi.ClusterName == other.ClusterName &&
89+
bi.IP == other.IP &&
8690
bi.StatusPort == other.StatusPort &&
8791
maps.Equal(bi.Labels, other.Labels)
8892
}

pkg/balance/observer/backend_health_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func TestBackendHealthToString(t *testing.T) {
1515
{},
1616
{
1717
BackendInfo: BackendInfo{
18+
Addr: "127.0.0.1:4000",
1819
IP: "127.0.0.1",
1920
StatusPort: 1,
2021
Labels: map[string]string{"k1": "v1", "k2": "v2"},
@@ -45,13 +46,15 @@ func TestBackendHealthEquals(t *testing.T) {
4546
{
4647
a: BackendHealth{
4748
BackendInfo: BackendInfo{
49+
Addr: "127.0.0.1:4000",
4850
IP: "127.0.0.1",
4951
StatusPort: 1,
5052
Labels: map[string]string{"k1": "v1", "k2": "v2"},
5153
},
5254
},
5355
b: BackendHealth{
5456
BackendInfo: BackendInfo{
57+
Addr: "127.0.0.1:4000",
5558
IP: "127.0.0.1",
5659
StatusPort: 1,
5760
},
@@ -61,13 +64,15 @@ func TestBackendHealthEquals(t *testing.T) {
6164
{
6265
a: BackendHealth{
6366
BackendInfo: BackendInfo{
67+
Addr: "127.0.0.1:4000",
6468
IP: "127.0.0.1",
6569
StatusPort: 1,
6670
Labels: map[string]string{"k1": "v1", "k2": "v2"},
6771
},
6872
},
6973
b: BackendHealth{
7074
BackendInfo: BackendInfo{
75+
Addr: "127.0.0.1:4000",
7176
IP: "127.0.0.1",
7277
StatusPort: 1,
7378
Labels: map[string]string{"k1": "v1", "k2": "v2"},
@@ -78,6 +83,7 @@ func TestBackendHealthEquals(t *testing.T) {
7883
{
7984
a: BackendHealth{
8085
BackendInfo: BackendInfo{
86+
Addr: "127.0.0.1:4000",
8187
IP: "127.0.0.1",
8288
StatusPort: 1,
8389
Labels: map[string]string{"k1": "v1", "k2": "v2"},

pkg/balance/observer/backend_observer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ func (ts *observerTestSuite) addBackend() (string, BackendInfo) {
279279
ts.backendIdx++
280280
addr := fmt.Sprintf("%d", ts.backendIdx)
281281
info := &BackendInfo{
282+
Addr: addr,
282283
IP: "127.0.0.1",
283284
StatusPort: uint(ts.backendIdx),
284285
}

pkg/balance/observer/health_check.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
// HealthCheck is used to check the backends of one backend. One can pass a customized health check function to the observer.
2323
type HealthCheck interface {
24-
Check(ctx context.Context, addr string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
24+
Check(ctx context.Context, backendID string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
2525
}
2626

2727
const (
@@ -62,7 +62,7 @@ func NewDefaultHealthCheck(httpCli *http.Client, cfg *config.HealthCheck, logger
6262
}
6363
}
6464

65-
func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
65+
func (dhc *DefaultHealthCheck) Check(ctx context.Context, _ string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
6666
bh := &BackendHealth{
6767
BackendInfo: *info,
6868
Healthy: true,
@@ -80,16 +80,22 @@ func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *Bac
8080
if !bh.Healthy {
8181
return bh
8282
}
83-
dhc.checkSqlPort(ctx, addr, bh)
83+
dhc.checkSqlPort(ctx, info, bh)
8484
if !bh.Healthy {
8585
return bh
8686
}
8787
dhc.queryConfig(ctx, info, bh, lastBh)
8888
return bh
8989
}
9090

91-
func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, addr string, bh *BackendHealth) {
91+
func (dhc *DefaultHealthCheck) checkSqlPort(ctx context.Context, info *BackendInfo, bh *BackendHealth) {
9292
// Also dial the SQL port just in case that the SQL port hangs.
93+
if info == nil || info.Addr == "" {
94+
bh.Healthy = false
95+
bh.PingErr = errors.New("backend address is empty")
96+
return
97+
}
98+
addr := info.Addr
9399
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
94100
err := http.ConnectWithRetry(func() error {
95101
startTime := time.Now()

pkg/balance/observer/health_check_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func newBackendServer(t *testing.T) (*backendServer, *BackendInfo) {
143143
backend.setSqlResp(true)
144144
backend.startSQLServer()
145145
return backend, &BackendInfo{
146+
Addr: backend.sqlAddr,
146147
IP: backend.ip,
147148
StatusPort: backend.statusPort,
148149
}

pkg/balance/observer/mock_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ import (
1919
)
2020

2121
type mockTpFetcher struct {
22-
t *testing.T
23-
infos map[string]*infosync.TiDBTopologyInfo
24-
err error
22+
t *testing.T
23+
infos map[string]*infosync.TiDBTopologyInfo
24+
err error
25+
hasClusters bool
2526
}
2627

2728
func newMockTpFetcher(t *testing.T) *mockTpFetcher {
@@ -34,6 +35,10 @@ func (ft *mockTpFetcher) GetTiDBTopology(ctx context.Context) (map[string]*infos
3435
return ft.infos, ft.err
3536
}
3637

38+
func (ft *mockTpFetcher) HasBackendClusters() bool {
39+
return ft.hasClusters
40+
}
41+
3742
type mockBackendFetcher struct {
3843
sync.Mutex
3944
backends map[string]*BackendInfo
@@ -82,11 +87,11 @@ func newMockHealthCheck() *mockHealthCheck {
8287
}
8388
}
8489

85-
func (mhc *mockHealthCheck) Check(_ context.Context, addr string, info *BackendInfo, _ *BackendHealth) *BackendHealth {
90+
func (mhc *mockHealthCheck) Check(_ context.Context, backendID string, info *BackendInfo, _ *BackendHealth) *BackendHealth {
8691
mhc.Lock()
8792
defer mhc.Unlock()
88-
mhc.backends[addr].BackendInfo = *info
89-
return mhc.backends[addr]
93+
mhc.backends[backendID].BackendInfo = *info
94+
return mhc.backends[backendID]
9095
}
9196

9297
func (mhc *mockHealthCheck) setBackend(addr string, health *BackendHealth) {

0 commit comments

Comments
 (0)