Skip to content

Commit 31d680a

Browse files
committed
metricsreader: move cluster-scoped metrics lifecycle into backend clusters
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent 30d5b3f commit 31d680a

32 files changed

+827
-139
lines changed

lib/config/proxy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ var (
2424
ErrInvalidConfigValue = errors.New("invalid config value")
2525
)
2626

27+
const DefaultBackendClusterName = "default"
28+
2729
type Config struct {
2830
Proxy ProxyServer `yaml:"proxy,omitempty" toml:"proxy,omitempty" json:"proxy,omitempty"`
2931
API API `yaml:"api,omitempty" toml:"api,omitempty" json:"api,omitempty"`
@@ -249,7 +251,7 @@ func (cfg *Config) GetBackendClusters() []BackendCluster {
249251
return nil
250252
}
251253
return []BackendCluster{{
252-
Name: "default",
254+
Name: DefaultBackendClusterName,
253255
PDAddrs: cfg.Proxy.PDAddrs,
254256
}}
255257
}

lib/config/proxy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func TestGetBackendClusters(t *testing.T) {
326326

327327
clusters := cfg.GetBackendClusters()
328328
require.Len(t, clusters, 1)
329-
require.Equal(t, "default", clusters[0].Name)
329+
require.Equal(t, DefaultBackendClusterName, clusters[0].Name)
330330
require.Equal(t, cfg.Proxy.PDAddrs, clusters[0].PDAddrs)
331331

332332
cfg.Proxy.BackendClusters = []BackendCluster{

pkg/balance/factor/factor_balance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type FactorBasedBalance struct {
3030
factors []Factor
3131
// to reduce memory allocation
3232
cachedList []scoredBackend
33-
mr metricsreader.MetricsReader
33+
mr metricsreader.MetricsQuerier
3434
lg *zap.Logger
3535
factorStatus *FactorStatus
3636
factorLabel *FactorLabel
@@ -44,7 +44,7 @@ type FactorBasedBalance struct {
4444
routePolicy string
4545
}
4646

47-
func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsReader) *FactorBasedBalance {
47+
func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsQuerier) *FactorBasedBalance {
4848
return &FactorBasedBalance{
4949
lg: lg,
5050
mr: mr,

pkg/balance/factor/factor_cpu.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,13 @@ type FactorCPU struct {
9595
lastMetricTime time.Time
9696
// The estimated average CPU usage used by one connection.
9797
usagePerConn float64
98-
mr metricsreader.MetricsReader
98+
mr metricsreader.MetricsQuerier
9999
bitNum int
100100
migrationsPerSecond float64
101101
lg *zap.Logger
102102
}
103103

104-
func NewFactorCPU(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorCPU {
104+
func NewFactorCPU(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorCPU {
105105
fc := &FactorCPU{
106106
mr: mr,
107107
bitNum: 5,
@@ -146,6 +146,7 @@ func (fc *FactorCPU) updateSnapshot(qr metricsreader.QueryResult, backends []sco
146146
now := time.Now()
147147
for _, backend := range backends {
148148
addr := backend.Addr()
149+
key := backend.ID()
149150
// If a backend exists in metrics but not in the backend list, ignore it for this round.
150151
// The backend will be in the next round if it's healthy.
151152
pairs := qr.GetSamplePair4Backend(backend)
@@ -155,7 +156,7 @@ func (fc *FactorCPU) updateSnapshot(qr metricsreader.QueryResult, backends []sco
155156
updateTime := time.UnixMilli(int64(pairs[len(pairs)-1].Timestamp))
156157
// The time point of updating each backend is different, so only partial of the backends are updated every time.
157158
// If this backend is not updated, ignore it.
158-
snapshot := fc.snapshot[addr]
159+
snapshot := fc.snapshot[key]
159160
if !snapshot.updatedTime.Before(updateTime) {
160161
continue
161162
}
@@ -164,7 +165,7 @@ func (fc *FactorCPU) updateSnapshot(qr metricsreader.QueryResult, backends []sco
164165
continue
165166
}
166167
metrics.BackendMetricGauge.WithLabelValues(addr, "cpu").Set(avgUsage)
167-
fc.snapshot[addr] = cpuBackendSnapshot{
168+
fc.snapshot[key] = cpuBackendSnapshot{
168169
avgUsage: avgUsage,
169170
latestUsage: latestUsage,
170171
connCount: backend.ConnCount(),
@@ -239,7 +240,7 @@ func (fc *FactorCPU) updateCpuPerConn() {
239240

240241
// Estimate the current cpu usage by the latest CPU usage, the latest connection count, and the current connection count.
241242
func (fc *FactorCPU) getUsage(backend scoredBackend) (avgUsage, latestUsage float64) {
242-
snapshot, ok := fc.snapshot[backend.Addr()]
243+
snapshot, ok := fc.snapshot[backend.ID()]
243244
if !ok || snapshot.avgUsage < 0 || latestUsage < 0 {
244245
// The metric has missed for minutes.
245246
return 1, 1
@@ -260,11 +261,11 @@ func (fc *FactorCPU) BalanceCount(from, to scoredBackend) (BalanceAdvice, float6
260261
fields := []zap.Field{
261262
zap.Float64("from_avg_usage", fromAvgUsage),
262263
zap.Float64("from_latest_usage", fromLatestUsage),
263-
zap.Int("from_snapshot_conn", fc.snapshot[from.Addr()].connCount),
264+
zap.Int("from_snapshot_conn", fc.snapshot[from.ID()].connCount),
264265
zap.Int("from_conn", from.ConnScore()),
265266
zap.Float64("to_avg_usage", toAvgUsage),
266267
zap.Float64("to_latest_usage", toLatestUsage),
267-
zap.Int("to_snapshot_conn", fc.snapshot[to.Addr()].connCount),
268+
zap.Int("to_snapshot_conn", fc.snapshot[to.ID()].connCount),
268269
zap.Int("to_conn", to.ConnScore()),
269270
zap.Float64("usage_per_conn", fc.usagePerConn),
270271
}

pkg/balance/factor/factor_cpu_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,34 @@ func TestCPUResultNotUpdated(t *testing.T) {
301301
}
302302
}
303303

304+
func TestCPUSnapshotUsesBackendID(t *testing.T) {
305+
now := model.Now()
306+
backends := []scoredBackend{
307+
createBackendWithAddrID("shared:4000", "cluster-a/shared:4000", "10.0.0.1", 10080, 10, 10),
308+
createBackendWithAddrID("shared:4000", "cluster-b/shared:4000", "10.0.0.2", 10080, 20, 20),
309+
}
310+
mmr := &mockMetricsReader{
311+
qrs: map[string]metricsreader.QueryResult{
312+
"cpu": {
313+
UpdateTime: time.Now(),
314+
Value: model.Matrix([]*model.SampleStream{
315+
createSampleStreamForInstance([]float64{0.2, 0.2}, "10.0.0.1:10080", now),
316+
createSampleStreamForInstance([]float64{0.6, 0.6}, "10.0.0.2:10080", now),
317+
}),
318+
},
319+
},
320+
}
321+
fc := NewFactorCPU(mmr, zap.NewNop())
322+
323+
fc.UpdateScore(backends)
324+
325+
require.Len(t, fc.snapshot, 2)
326+
require.Contains(t, fc.snapshot, "cluster-a/shared:4000")
327+
require.Contains(t, fc.snapshot, "cluster-b/shared:4000")
328+
require.Equal(t, 0.2, fc.snapshot["cluster-a/shared:4000"].latestUsage)
329+
require.Equal(t, 0.6, fc.snapshot["cluster-b/shared:4000"].latestUsage)
330+
}
331+
304332
func TestCPUQueryRule(t *testing.T) {
305333
tests := []struct {
306334
text string

pkg/balance/factor/factor_health.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,13 @@ type errIndicator struct {
187187
type FactorHealth struct {
188188
snapshot map[string]healthBackendSnapshot
189189
indicators []errIndicator
190-
mr metricsreader.MetricsReader
190+
mr metricsreader.MetricsQuerier
191191
bitNum int
192192
migrationsPerSecond float64
193193
lg *zap.Logger
194194
}
195195

196-
func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHealth {
196+
func NewFactorHealth(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorHealth {
197197
return &FactorHealth{
198198
mr: mr,
199199
snapshot: make(map[string]healthBackendSnapshot),
@@ -203,7 +203,7 @@ func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHeal
203203
}
204204
}
205205

206-
func initErrIndicator(mr metricsreader.MetricsReader) []errIndicator {
206+
func initErrIndicator(mr metricsreader.MetricsQuerier) []errIndicator {
207207
indicators := make([]errIndicator, 0, len(errDefinitions))
208208
for _, def := range errDefinitions {
209209
indicator := errIndicator{
@@ -268,7 +268,7 @@ func (fh *FactorHealth) UpdateScore(backends []scoredBackend) {
268268
fh.updateSnapshot(backends)
269269
}
270270
for i := range backends {
271-
score := fh.caclErrScore(backends[i].Addr())
271+
score := fh.caclErrScore(backends[i].ID())
272272
backends[i].addScore(score, fh.bitNum)
273273
}
274274
}
@@ -281,6 +281,7 @@ func (fh *FactorHealth) updateSnapshot(backends []scoredBackend) {
281281
now := time.Now()
282282
for _, backend := range backends {
283283
addr := backend.Addr()
284+
key := backend.ID()
284285
// Get the current value range.
285286
updatedTime, valueRange, indicator, failureValue, totalValue := time.Time{}, valueRangeNormal, "", 0.0, 0.0
286287
for _, ind := range fh.indicators {
@@ -310,7 +311,7 @@ func (fh *FactorHealth) updateSnapshot(backends []scoredBackend) {
310311
}
311312
}
312313
// If the metric is unavailable, try to reuse the latest one.
313-
snapshot := fh.snapshot[addr]
314+
snapshot := fh.snapshot[key]
314315
if updatedTime.IsZero() {
315316
continue
316317
}
@@ -335,7 +336,7 @@ func (fh *FactorHealth) updateSnapshot(backends []scoredBackend) {
335336
zap.Float64("balance_count", balanceCount),
336337
zap.Int("conn_score", backend.ConnScore()))
337338
}
338-
fh.snapshot[addr] = healthBackendSnapshot{
339+
fh.snapshot[key] = healthBackendSnapshot{
339340
updatedTime: updatedTime,
340341
valueRange: valueRange,
341342
indicator: indicator,
@@ -391,9 +392,9 @@ func calcValueRange(failureSample, totalSample *model.Sample, indicator errIndic
391392
return failureValue, totalValue, valueRangeMid
392393
}
393394

394-
func (fh *FactorHealth) caclErrScore(addr string) int {
395+
func (fh *FactorHealth) caclErrScore(key string) int {
395396
// If the backend has no metrics (not in snapshot), take it as healthy.
396-
return int(fh.snapshot[addr].valueRange)
397+
return int(fh.snapshot[key].valueRange)
397398
}
398399

399400
func (fh *FactorHealth) ScoreBitNum() int {
@@ -402,9 +403,9 @@ func (fh *FactorHealth) ScoreBitNum() int {
402403

403404
func (fh *FactorHealth) BalanceCount(from, to scoredBackend) (BalanceAdvice, float64, []zap.Field) {
404405
// Only migrate connections when one is valueRangeNormal and the other is valueRangeAbnormal.
405-
fromScore := fh.caclErrScore(from.Addr())
406-
toScore := fh.caclErrScore(to.Addr())
407-
snapshot := fh.snapshot[from.Addr()]
406+
fromScore := fh.caclErrScore(from.ID())
407+
toScore := fh.caclErrScore(to.ID())
408+
snapshot := fh.snapshot[from.ID()]
408409
var fields []zap.Field
409410
if snapshot.indicator != "" {
410411
fields = append(fields,

pkg/balance/factor/factor_health_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,40 @@ func TestHealthBalanceCount(t *testing.T) {
310310
}
311311
}
312312

313+
func TestHealthSnapshotUsesBackendID(t *testing.T) {
314+
backends := []scoredBackend{
315+
createBackendWithAddrID("shared:4000", "cluster-a/shared:4000", "10.0.0.1", 10080, 10, 10),
316+
createBackendWithAddrID("shared:4000", "cluster-b/shared:4000", "10.0.0.2", 10080, 20, 20),
317+
}
318+
mmr := &mockMetricsReader{
319+
qrs: map[string]metricsreader.QueryResult{
320+
"failure_pd": {
321+
UpdateTime: time.Now(),
322+
Value: model.Vector([]*model.Sample{
323+
createSampleForInstance(0, "10.0.0.1:10080"),
324+
createSampleForInstance(100, "10.0.0.2:10080"),
325+
}),
326+
},
327+
"total_pd": {
328+
UpdateTime: time.Now(),
329+
Value: model.Vector([]*model.Sample{
330+
createSampleForInstance(100, "10.0.0.1:10080"),
331+
createSampleForInstance(100, "10.0.0.2:10080"),
332+
}),
333+
},
334+
},
335+
}
336+
fh := NewFactorHealth(mmr, zap.NewNop())
337+
338+
fh.UpdateScore(backends)
339+
340+
require.Len(t, fh.snapshot, 2)
341+
require.Contains(t, fh.snapshot, "cluster-a/shared:4000")
342+
require.Contains(t, fh.snapshot, "cluster-b/shared:4000")
343+
require.Equal(t, valueRangeNormal, fh.snapshot["cluster-a/shared:4000"].valueRange)
344+
require.Equal(t, valueRangeAbnormal, fh.snapshot["cluster-b/shared:4000"].valueRange)
345+
}
346+
313347
func TestHealthQueryRule(t *testing.T) {
314348
tests := []struct {
315349
text string

pkg/balance/factor/factor_memory.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,13 @@ type FactorMemory struct {
107107
snapshot map[string]memBackendSnapshot
108108
// The updated time of the metric that we've read last time.
109109
lastMetricTime time.Time
110-
mr metricsreader.MetricsReader
110+
mr metricsreader.MetricsQuerier
111111
bitNum int
112112
migrationsPerSecond float64
113113
lg *zap.Logger
114114
}
115115

116-
func NewFactorMemory(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorMemory {
116+
func NewFactorMemory(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorMemory {
117117
bitNum := 0
118118
for levels := len(oomRiskLevels); ; bitNum++ {
119119
if levels == 0 {
@@ -155,9 +155,9 @@ func (fm *FactorMemory) UpdateScore(backends []scoredBackend) {
155155
}
156156

157157
for i := range backends {
158-
addr := backends[i].Addr()
158+
key := backends[i].ID()
159159
// If the backend is new or the backend misses metrics, take it safe.
160-
score := fm.snapshot[addr].riskLevel
160+
score := fm.snapshot[key].riskLevel
161161
backends[i].addScore(score, fm.bitNum)
162162
}
163163
}
@@ -170,7 +170,8 @@ func (fm *FactorMemory) updateSnapshot(qr metricsreader.QueryResult, backends []
170170
now := time.Now()
171171
for _, backend := range backends {
172172
addr := backend.Addr()
173-
snapshot := fm.snapshot[addr]
173+
key := backend.ID()
174+
snapshot := fm.snapshot[key]
174175
// If a backend exists in metrics but not in the backend list, ignore it for this round.
175176
// The backend will be in the next round if it's healthy.
176177
pairs := qr.GetSamplePair4Backend(backend)
@@ -202,7 +203,7 @@ func (fm *FactorMemory) updateSnapshot(qr metricsreader.QueryResult, backends []
202203
zap.Float64("balance_count", balanceCount),
203204
zap.Int("conn_score", backend.ConnScore()))
204205
}
205-
fm.snapshot[addr] = memBackendSnapshot{
206+
fm.snapshot[key] = memBackendSnapshot{
206207
updatedTime: updateTime,
207208
memUsage: latestUsage,
208209
timeToOOM: timeToOOM,
@@ -265,7 +266,7 @@ func (fm *FactorMemory) calcBalanceCount(backend scoredBackend, riskLevel int, t
265266
}
266267
balanceCount := float64(backend.ConnScore()) / seconds
267268
// If the migration started eariler, reuse the balance count.
268-
if snapshot := fm.snapshot[backend.Addr()]; snapshot.balanceCount > balanceCount {
269+
if snapshot := fm.snapshot[backend.ID()]; snapshot.balanceCount > balanceCount {
269270
return snapshot.balanceCount
270271
}
271272
return balanceCount
@@ -279,8 +280,8 @@ func (fm *FactorMemory) BalanceCount(from, to scoredBackend) (BalanceAdvice, flo
279280
// The risk level may change frequently, e.g. last time timeToOOM was 30s and connections were migrated away,
280281
// then this time it becomes 60s and the connections are migrated back.
281282
// So we only rebalance when the difference of risk levels of 2 backends is big enough.
282-
fromSnapshot := fm.snapshot[from.Addr()]
283-
toSnapshot := fm.snapshot[to.Addr()]
283+
fromSnapshot := fm.snapshot[from.ID()]
284+
toSnapshot := fm.snapshot[to.ID()]
284285
fields := []zap.Field{
285286
zap.Duration("from_time_to_oom", fromSnapshot.timeToOOM),
286287
zap.Float64("from_mem_usage", fromSnapshot.memUsage),

pkg/balance/factor/factor_memory_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,34 @@ func TestMemoryBalanceCount(t *testing.T) {
352352
}
353353
}
354354

355+
func TestMemorySnapshotUsesBackendID(t *testing.T) {
356+
now := model.Now()
357+
backends := []scoredBackend{
358+
createBackendWithAddrID("shared:4000", "cluster-a/shared:4000", "10.0.0.1", 10080, 10, 10),
359+
createBackendWithAddrID("shared:4000", "cluster-b/shared:4000", "10.0.0.2", 10080, 20, 20),
360+
}
361+
mmr := &mockMetricsReader{
362+
qrs: map[string]metricsreader.QueryResult{
363+
"memory": {
364+
UpdateTime: time.Now(),
365+
Value: model.Matrix([]*model.SampleStream{
366+
createSampleStreamForInstance([]float64{0.2, 0.2}, "10.0.0.1:10080", now),
367+
createSampleStreamForInstance([]float64{0.85, 0.85}, "10.0.0.2:10080", now),
368+
}),
369+
},
370+
},
371+
}
372+
fm := NewFactorMemory(mmr, zap.NewNop())
373+
374+
fm.UpdateScore(backends)
375+
376+
require.Len(t, fm.snapshot, 2)
377+
require.Contains(t, fm.snapshot, "cluster-a/shared:4000")
378+
require.Contains(t, fm.snapshot, "cluster-b/shared:4000")
379+
require.Equal(t, 0.2, fm.snapshot["cluster-a/shared:4000"].memUsage)
380+
require.Equal(t, 0.85, fm.snapshot["cluster-b/shared:4000"].memUsage)
381+
}
382+
355383
func TestMemoryQueryRule(t *testing.T) {
356384
tests := []struct {
357385
text string

0 commit comments

Comments
 (0)