Skip to content

Commit a8ab422

Browse files
committed
metricsreader: move cluster-scoped metrics lifecycle into backend clusters
1 parent 1add4f8 commit a8ab422

File tree

17 files changed

+472
-86
lines changed

17 files changed

+472
-86
lines changed

lib/config/proxy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ var (
2424
ErrInvalidConfigValue = errors.New("invalid config value")
2525
)
2626

27+
const DefaultBackendClusterName = "default"
28+
2729
type Config struct {
2830
Proxy ProxyServer `yaml:"proxy,omitempty" toml:"proxy,omitempty" json:"proxy,omitempty"`
2931
API API `yaml:"api,omitempty" toml:"api,omitempty" json:"api,omitempty"`
@@ -249,7 +251,7 @@ func (cfg *Config) GetBackendClusters() []BackendCluster {
249251
return nil
250252
}
251253
return []BackendCluster{{
252-
Name: "default",
254+
Name: DefaultBackendClusterName,
253255
PDAddrs: cfg.Proxy.PDAddrs,
254256
}}
255257
}

lib/config/proxy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func TestGetBackendClusters(t *testing.T) {
314314

315315
clusters := cfg.GetBackendClusters()
316316
require.Len(t, clusters, 1)
317-
require.Equal(t, "default", clusters[0].Name)
317+
require.Equal(t, DefaultBackendClusterName, clusters[0].Name)
318318
require.Equal(t, cfg.Proxy.PDAddrs, clusters[0].PDAddrs)
319319

320320
cfg.Proxy.BackendClusters = []BackendCluster{

pkg/balance/factor/factor_balance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type FactorBasedBalance struct {
3030
factors []Factor
3131
// to reduce memory allocation
3232
cachedList []scoredBackend
33-
mr metricsreader.MetricsReader
33+
mr metricsreader.MetricsQuerier
3434
lg *zap.Logger
3535
factorStatus *FactorStatus
3636
factorLabel *FactorLabel
@@ -44,7 +44,7 @@ type FactorBasedBalance struct {
4444
routePolicy string
4545
}
4646

47-
func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsReader) *FactorBasedBalance {
47+
func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsQuerier) *FactorBasedBalance {
4848
return &FactorBasedBalance{
4949
lg: lg,
5050
mr: mr,

pkg/balance/factor/factor_cpu.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,13 @@ type FactorCPU struct {
9595
lastMetricTime time.Time
9696
// The estimated average CPU usage used by one connection.
9797
usagePerConn float64
98-
mr metricsreader.MetricsReader
98+
mr metricsreader.MetricsQuerier
9999
bitNum int
100100
migrationsPerSecond float64
101101
lg *zap.Logger
102102
}
103103

104-
func NewFactorCPU(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorCPU {
104+
func NewFactorCPU(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorCPU {
105105
fc := &FactorCPU{
106106
mr: mr,
107107
bitNum: 5,

pkg/balance/factor/factor_health.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,13 @@ type errIndicator struct {
187187
type FactorHealth struct {
188188
snapshot map[string]healthBackendSnapshot
189189
indicators []errIndicator
190-
mr metricsreader.MetricsReader
190+
mr metricsreader.MetricsQuerier
191191
bitNum int
192192
migrationsPerSecond float64
193193
lg *zap.Logger
194194
}
195195

196-
func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHealth {
196+
func NewFactorHealth(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorHealth {
197197
return &FactorHealth{
198198
mr: mr,
199199
snapshot: make(map[string]healthBackendSnapshot),
@@ -203,7 +203,7 @@ func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHeal
203203
}
204204
}
205205

206-
func initErrIndicator(mr metricsreader.MetricsReader) []errIndicator {
206+
func initErrIndicator(mr metricsreader.MetricsQuerier) []errIndicator {
207207
indicators := make([]errIndicator, 0, len(errDefinitions))
208208
for _, def := range errDefinitions {
209209
indicator := errIndicator{

pkg/balance/factor/factor_memory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,13 @@ type FactorMemory struct {
107107
snapshot map[string]memBackendSnapshot
108108
// The updated time of the metric that we've read last time.
109109
lastMetricTime time.Time
110-
mr metricsreader.MetricsReader
110+
mr metricsreader.MetricsQuerier
111111
bitNum int
112112
migrationsPerSecond float64
113113
lg *zap.Logger
114114
}
115115

116-
func NewFactorMemory(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorMemory {
116+
func NewFactorMemory(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorMemory {
117117
bitNum := 0
118118
for levels := len(oomRiskLevels); ; bitNum++ {
119119
if levels == 0 {

pkg/balance/metricsreader/backend_reader.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"math"
1111
"net"
12+
"net/url"
1213
"slices"
1314
"strconv"
1415
"strings"
@@ -35,18 +36,15 @@ import (
3536

3637
const (
3738
// readerOwnerKeyPrefix is the key prefix in etcd for backend reader owner election.
38-
// For global owner, the key is "/tiproxy/metric_reader/owner".
39-
// For zonal owner, the key is "/tiproxy/metric_reader/{zone}/owner".
40-
readerOwnerKeyPrefix = "/tiproxy/metric_reader"
39+
// For the default cluster, the key is "/tiproxy/metric_reader/owner".
40+
// For a named cluster, the key is "/tiproxy/metric_reader/{cluster}/owner".
4141
readerOwnerKeySuffix = "owner"
4242
// sessionTTL is the session's TTL in seconds for backend reader owner election.
4343
sessionTTL = 15
4444
// backendMetricPath is the path of backend HTTP API to read metrics.
4545
backendMetricPath = "/metrics"
46-
// ownerMetricPath is the path of reading backend metrics from the backend reader owner.
47-
ownerMetricPath = "/api/backend/metrics"
48-
goPoolSize = 100
49-
goMaxIdle = time.Minute
46+
goPoolSize = 100
47+
goMaxIdle = time.Minute
5048
)
5149

5250
var (
@@ -72,6 +70,7 @@ type BackendReader struct {
7270
marshalledHistory []byte
7371
cfgGetter config.ConfigGetter
7472
backendFetcher TopologyFetcher
73+
clusterName string
7574
lastZone string
7675
electionCfg elect.ElectionConfig
7776
election elect.Election
@@ -84,6 +83,11 @@ type BackendReader struct {
8483
}
8584

8685
func NewBackendReader(lg *zap.Logger, cfgGetter config.ConfigGetter, httpCli *http.Client, etcdCli *clientv3.Client,
86+
backendFetcher TopologyFetcher, cfg *config.HealthCheck) *BackendReader {
87+
return NewClusterBackendReader(lg, "", cfgGetter, httpCli, etcdCli, backendFetcher, cfg)
88+
}
89+
90+
func NewClusterBackendReader(lg *zap.Logger, clusterName string, cfgGetter config.ConfigGetter, httpCli *http.Client, etcdCli *clientv3.Client,
8791
backendFetcher TopologyFetcher, cfg *config.HealthCheck) *BackendReader {
8892
return &BackendReader{
8993
queryRules: make(map[string]QueryRule),
@@ -92,6 +96,7 @@ func NewBackendReader(lg *zap.Logger, cfgGetter config.ConfigGetter, httpCli *ht
9296
lg: lg,
9397
cfgGetter: cfgGetter,
9498
backendFetcher: backendFetcher,
99+
clusterName: strings.TrimSpace(clusterName),
95100
cfg: cfg,
96101
wgp: waitgroup.NewWaitGroupPool(goPoolSize, goMaxIdle),
97102
electionCfg: elect.DefaultElectionConfig(sessionTTL),
@@ -118,9 +123,9 @@ func (br *BackendReader) initElection(ctx context.Context, cfg *config.Config) e
118123
br.lastZone = cfg.GetLocation()
119124
if len(br.lastZone) > 0 {
120125
// Zonal owners are responsible for the backends in the same zone or not in any TiProxy zone.
121-
key = fmt.Sprintf("%s/%s/%s", readerOwnerKeyPrefix, br.lastZone, readerOwnerKeySuffix)
126+
key = fmt.Sprintf("%s/%s/%s", readerOwnerKeyPrefix(br.clusterName), br.lastZone, readerOwnerKeySuffix)
122127
} else {
123-
key = fmt.Sprintf("%s/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
128+
key = fmt.Sprintf("%s/%s", readerOwnerKeyPrefix(br.clusterName), readerOwnerKeySuffix)
124129
}
125130
br.election = elect.NewElection(br.lg.Named("elect"), br.etcdCli, br.electionCfg, id, key, br)
126131
br.election.Start(ctx)
@@ -213,7 +218,8 @@ func (br *BackendReader) queryAllOwners(ctx context.Context) (zones, owners []st
213218
// Get all owner keys.
214219
opts := []clientv3.OpOption{clientv3.WithPrefix()}
215220
var kvs []*mvccpb.KeyValue
216-
kvs, err = etcd.GetKVs(ctx, br.etcdCli, readerOwnerKeyPrefix, opts, br.electionCfg.Timeout, br.electionCfg.RetryIntvl, br.electionCfg.RetryCnt)
221+
keyPrefix := readerOwnerKeyPrefix(br.clusterName)
222+
kvs, err = etcd.GetKVs(ctx, br.etcdCli, keyPrefix, opts, br.electionCfg.Timeout, br.electionCfg.RetryIntvl, br.electionCfg.RetryCnt)
217223
if err != nil {
218224
return
219225
}
@@ -227,7 +233,7 @@ func (br *BackendReader) queryAllOwners(ctx context.Context) (zones, owners []st
227233
ownerMap := make(map[string]ownerInfo)
228234
for _, kv := range kvs {
229235
key := hack.String(kv.Key)
230-
key = key[len(readerOwnerKeyPrefix):]
236+
key = key[len(keyPrefix):]
231237
if len(key) == 0 || key[0] != '/' {
232238
continue
233239
}
@@ -466,7 +472,7 @@ func (br *BackendReader) GetBackendMetrics() []byte {
466472
// If every member queries directly from backends, the backends may suffer from too much pressure.
467473
func (br *BackendReader) readFromOwner(ctx context.Context, ownerAddr string) error {
468474
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(br.cfg.RetryInterval), uint64(br.cfg.MaxRetries)), ctx)
469-
resp, err := br.httpCli.Get(ownerAddr, ownerMetricPath, b, br.cfg.DialTimeout)
475+
resp, err := br.httpCli.Get(ownerAddr, backendMetricOwnerPath(br.clusterName), b, br.cfg.DialTimeout)
470476
if err != nil {
471477
return err
472478
}
@@ -570,6 +576,22 @@ func (br *BackendReader) Close() {
570576
}
571577
}
572578

579+
func readerOwnerKeyPrefix(clusterName string) string {
580+
clusterName = strings.TrimSpace(clusterName)
581+
if clusterName == "" || clusterName == config.DefaultBackendClusterName {
582+
return "/tiproxy/metric_reader"
583+
}
584+
return fmt.Sprintf("/tiproxy/metric_reader/%s", clusterName)
585+
}
586+
587+
func backendMetricOwnerPath(clusterName string) string {
588+
clusterName = strings.TrimSpace(clusterName)
589+
if clusterName == "" {
590+
return "/api/backend/metrics"
591+
}
592+
return fmt.Sprintf("/api/backend/metrics?cluster=%s", url.QueryEscape(clusterName))
593+
}
594+
573595
func purgeHistory(history []model.SamplePair, retention time.Duration, now time.Time) []model.SamplePair {
574596
idx := -1
575597
for i := range history {

pkg/balance/metricsreader/backend_reader_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,7 +1149,7 @@ func TestQueryAllOwners(t *testing.T) {
11491149
br := NewBackendReader(lg, nil, nil, suite.client, nil, nil)
11501150
for i, test := range tests {
11511151
for i, key := range test.keys {
1152-
key = fmt.Sprintf("%s%s", readerOwnerKeyPrefix, key)
1152+
key = fmt.Sprintf("%s%s", readerOwnerKeyPrefix(""), key)
11531153
suite.putKV(key, test.values[i])
11541154
}
11551155
zones, owners, err := br.queryAllOwners(context.Background())
@@ -1166,7 +1166,7 @@ func TestQueryAllOwners(t *testing.T) {
11661166
slices.Sort(zones)
11671167
require.Equal(t, test.zones, zones, "case %d", i)
11681168
}
1169-
suite.delKV(readerOwnerKeyPrefix)
1169+
suite.delKV(readerOwnerKeyPrefix(""))
11701170
}
11711171
}
11721172

@@ -1184,15 +1184,15 @@ func TestUpdateLabel(t *testing.T) {
11841184
defer br.Close()
11851185

11861186
checkKeyPrefix := func(prefix string) bool {
1187-
kvs := suite.getKV(readerOwnerKeyPrefix)
1187+
kvs := suite.getKV(readerOwnerKeyPrefix(""))
11881188
if len(kvs) != 1 {
11891189
return false
11901190
}
11911191
return strings.HasPrefix(string(kvs[0].Key), prefix)
11921192
}
11931193

11941194
// campaign for the global owner
1195-
prefix := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
1195+
prefix := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix(""), readerOwnerKeySuffix)
11961196
require.Eventually(t, func() bool {
11971197
return checkKeyPrefix(prefix)
11981198
}, 3*time.Second, 10*time.Millisecond)
@@ -1201,7 +1201,7 @@ func TestUpdateLabel(t *testing.T) {
12011201
cfg.Labels = map[string]string{config.LocationLabelName: "east"}
12021202
err = br.ReadMetrics(context.Background())
12031203
require.NoError(t, err)
1204-
prefix = fmt.Sprintf("%s/east/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
1204+
prefix = fmt.Sprintf("%s/east/%s", readerOwnerKeyPrefix(""), readerOwnerKeySuffix)
12051205
require.Eventually(t, func() bool {
12061206
return checkKeyPrefix(prefix)
12071207
}, 3*time.Second, 10*time.Millisecond)
@@ -1255,7 +1255,7 @@ func TestElection(t *testing.T) {
12551255
// setup etcd
12561256
suite := newEtcdTestSuite(t)
12571257
t.Cleanup(suite.close)
1258-
ownerKey := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
1258+
ownerKey := fmt.Sprintf("%s/%s", readerOwnerKeyPrefix(""), readerOwnerKeySuffix)
12591259
suite.putKV(ownerKey, addr)
12601260
require.Eventually(t, func() bool {
12611261
kvs := suite.getKV(ownerKey)
@@ -1330,3 +1330,9 @@ func setupTypicalBackendListener(t *testing.T, respBody string) (backendPort int
13301330
t.Cleanup(backendHttpHandler.Close)
13311331
return
13321332
}
1333+
1334+
func TestBackendMetricOwnerPath(t *testing.T) {
1335+
require.Equal(t, "/api/backend/metrics", backendMetricOwnerPath(""))
1336+
require.Equal(t, "/api/backend/metrics?cluster=cluster-a", backendMetricOwnerPath("cluster-a"))
1337+
require.Equal(t, "/api/backend/metrics?cluster=cluster+a%2Fb", backendMetricOwnerPath("cluster a/b"))
1338+
}

0 commit comments

Comments
 (0)