Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var (
ErrInvalidConfigValue = errors.New("invalid config value")
)

const DefaultBackendClusterName = "default"

type Config struct {
Proxy ProxyServer `yaml:"proxy,omitempty" toml:"proxy,omitempty" json:"proxy,omitempty"`
API API `yaml:"api,omitempty" toml:"api,omitempty" json:"api,omitempty"`
Expand Down Expand Up @@ -249,7 +251,7 @@ func (cfg *Config) GetBackendClusters() []BackendCluster {
return nil
}
return []BackendCluster{{
Name: "default",
Name: DefaultBackendClusterName,
PDAddrs: cfg.Proxy.PDAddrs,
}}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestGetBackendClusters(t *testing.T) {

clusters := cfg.GetBackendClusters()
require.Len(t, clusters, 1)
require.Equal(t, "default", clusters[0].Name)
require.Equal(t, DefaultBackendClusterName, clusters[0].Name)
require.Equal(t, cfg.Proxy.PDAddrs, clusters[0].PDAddrs)

cfg.Proxy.BackendClusters = []BackendCluster{
Expand Down
4 changes: 2 additions & 2 deletions pkg/balance/factor/factor_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type FactorBasedBalance struct {
factors []Factor
// to reduce memory allocation
cachedList []scoredBackend
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
lg *zap.Logger
factorStatus *FactorStatus
factorLabel *FactorLabel
Expand All @@ -44,7 +44,7 @@ type FactorBasedBalance struct {
routePolicy string
}

func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsReader) *FactorBasedBalance {
func NewFactorBasedBalance(lg *zap.Logger, mr metricsreader.MetricsQuerier) *FactorBasedBalance {
return &FactorBasedBalance{
lg: lg,
mr: mr,
Expand Down
15 changes: 8 additions & 7 deletions pkg/balance/factor/factor_cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ type FactorCPU struct {
lastMetricTime time.Time
// The estimated average CPU usage used by one connection.
usagePerConn float64
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
bitNum int
migrationsPerSecond float64
lg *zap.Logger
}

func NewFactorCPU(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorCPU {
func NewFactorCPU(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorCPU {
fc := &FactorCPU{
mr: mr,
bitNum: 5,
Expand Down Expand Up @@ -146,6 +146,7 @@ func (fc *FactorCPU) updateSnapshot(qr metricsreader.QueryResult, backends []sco
now := time.Now()
for _, backend := range backends {
addr := backend.Addr()
key := backend.ID()
// If a backend exists in metrics but not in the backend list, ignore it for this round.
// The backend will be in the next round if it's healthy.
pairs := qr.GetSamplePair4Backend(backend)
Expand All @@ -155,7 +156,7 @@ func (fc *FactorCPU) updateSnapshot(qr metricsreader.QueryResult, backends []sco
updateTime := time.UnixMilli(int64(pairs[len(pairs)-1].Timestamp))
// The time point of updating each backend is different, so only partial of the backends are updated every time.
// If this backend is not updated, ignore it.
snapshot := fc.snapshot[addr]
snapshot := fc.snapshot[key]
if !snapshot.updatedTime.Before(updateTime) {
continue
}
Expand All @@ -164,7 +165,7 @@ func (fc *FactorCPU) updateSnapshot(qr metricsreader.QueryResult, backends []sco
continue
}
metrics.BackendMetricGauge.WithLabelValues(addr, "cpu").Set(avgUsage)
fc.snapshot[addr] = cpuBackendSnapshot{
fc.snapshot[key] = cpuBackendSnapshot{
avgUsage: avgUsage,
latestUsage: latestUsage,
connCount: backend.ConnCount(),
Expand Down Expand Up @@ -239,7 +240,7 @@ func (fc *FactorCPU) updateCpuPerConn() {

// Estimate the current cpu usage by the latest CPU usage, the latest connection count, and the current connection count.
func (fc *FactorCPU) getUsage(backend scoredBackend) (avgUsage, latestUsage float64) {
snapshot, ok := fc.snapshot[backend.Addr()]
snapshot, ok := fc.snapshot[backend.ID()]
if !ok || snapshot.avgUsage < 0 || latestUsage < 0 {
// The metric has missed for minutes.
return 1, 1
Expand All @@ -260,11 +261,11 @@ func (fc *FactorCPU) BalanceCount(from, to scoredBackend) (BalanceAdvice, float6
fields := []zap.Field{
zap.Float64("from_avg_usage", fromAvgUsage),
zap.Float64("from_latest_usage", fromLatestUsage),
zap.Int("from_snapshot_conn", fc.snapshot[from.Addr()].connCount),
zap.Int("from_snapshot_conn", fc.snapshot[from.ID()].connCount),
zap.Int("from_conn", from.ConnScore()),
zap.Float64("to_avg_usage", toAvgUsage),
zap.Float64("to_latest_usage", toLatestUsage),
zap.Int("to_snapshot_conn", fc.snapshot[to.Addr()].connCount),
zap.Int("to_snapshot_conn", fc.snapshot[to.ID()].connCount),
zap.Int("to_conn", to.ConnScore()),
zap.Float64("usage_per_conn", fc.usagePerConn),
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/balance/factor/factor_cpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,34 @@ func TestCPUResultNotUpdated(t *testing.T) {
}
}

func TestCPUSnapshotUsesBackendID(t *testing.T) {
now := model.Now()
backends := []scoredBackend{
createBackendWithAddrID("shared:4000", "cluster-a/shared:4000", "10.0.0.1", 10080, 10, 10),
createBackendWithAddrID("shared:4000", "cluster-b/shared:4000", "10.0.0.2", 10080, 20, 20),
}
mmr := &mockMetricsReader{
qrs: map[string]metricsreader.QueryResult{
"cpu": {
UpdateTime: time.Now(),
Value: model.Matrix([]*model.SampleStream{
createSampleStreamForInstance([]float64{0.2, 0.2}, "10.0.0.1:10080", now),
createSampleStreamForInstance([]float64{0.6, 0.6}, "10.0.0.2:10080", now),
}),
},
},
}
fc := NewFactorCPU(mmr, zap.NewNop())

fc.UpdateScore(backends)

require.Len(t, fc.snapshot, 2)
require.Contains(t, fc.snapshot, "cluster-a/shared:4000")
require.Contains(t, fc.snapshot, "cluster-b/shared:4000")
require.Equal(t, 0.2, fc.snapshot["cluster-a/shared:4000"].latestUsage)
require.Equal(t, 0.6, fc.snapshot["cluster-b/shared:4000"].latestUsage)
}

func TestCPUQueryRule(t *testing.T) {
tests := []struct {
text string
Expand Down
23 changes: 12 additions & 11 deletions pkg/balance/factor/factor_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ type errIndicator struct {
type FactorHealth struct {
snapshot map[string]healthBackendSnapshot
indicators []errIndicator
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
bitNum int
migrationsPerSecond float64
lg *zap.Logger
}

func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHealth {
func NewFactorHealth(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorHealth {
return &FactorHealth{
mr: mr,
snapshot: make(map[string]healthBackendSnapshot),
Expand All @@ -203,7 +203,7 @@ func NewFactorHealth(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorHeal
}
}

func initErrIndicator(mr metricsreader.MetricsReader) []errIndicator {
func initErrIndicator(mr metricsreader.MetricsQuerier) []errIndicator {
indicators := make([]errIndicator, 0, len(errDefinitions))
for _, def := range errDefinitions {
indicator := errIndicator{
Expand Down Expand Up @@ -268,7 +268,7 @@ func (fh *FactorHealth) UpdateScore(backends []scoredBackend) {
fh.updateSnapshot(backends)
}
for i := range backends {
score := fh.caclErrScore(backends[i].Addr())
score := fh.caclErrScore(backends[i].ID())
backends[i].addScore(score, fh.bitNum)
}
}
Expand All @@ -281,6 +281,7 @@ func (fh *FactorHealth) updateSnapshot(backends []scoredBackend) {
now := time.Now()
for _, backend := range backends {
addr := backend.Addr()
key := backend.ID()
// Get the current value range.
updatedTime, valueRange, indicator, failureValue, totalValue := time.Time{}, valueRangeNormal, "", 0.0, 0.0
for _, ind := range fh.indicators {
Expand Down Expand Up @@ -310,7 +311,7 @@ func (fh *FactorHealth) updateSnapshot(backends []scoredBackend) {
}
}
// If the metric is unavailable, try to reuse the latest one.
snapshot := fh.snapshot[addr]
snapshot := fh.snapshot[key]
if updatedTime.IsZero() {
continue
}
Expand All @@ -335,7 +336,7 @@ func (fh *FactorHealth) updateSnapshot(backends []scoredBackend) {
zap.Float64("balance_count", balanceCount),
zap.Int("conn_score", backend.ConnScore()))
}
fh.snapshot[addr] = healthBackendSnapshot{
fh.snapshot[key] = healthBackendSnapshot{
updatedTime: updatedTime,
valueRange: valueRange,
indicator: indicator,
Expand Down Expand Up @@ -391,9 +392,9 @@ func calcValueRange(failureSample, totalSample *model.Sample, indicator errIndic
return failureValue, totalValue, valueRangeMid
}

func (fh *FactorHealth) caclErrScore(addr string) int {
func (fh *FactorHealth) caclErrScore(key string) int {
// If the backend has no metrics (not in snapshot), take it as healthy.
return int(fh.snapshot[addr].valueRange)
return int(fh.snapshot[key].valueRange)
}

func (fh *FactorHealth) ScoreBitNum() int {
Expand All @@ -402,9 +403,9 @@ func (fh *FactorHealth) ScoreBitNum() int {

func (fh *FactorHealth) BalanceCount(from, to scoredBackend) (BalanceAdvice, float64, []zap.Field) {
// Only migrate connections when one is valueRangeNormal and the other is valueRangeAbnormal.
fromScore := fh.caclErrScore(from.Addr())
toScore := fh.caclErrScore(to.Addr())
snapshot := fh.snapshot[from.Addr()]
fromScore := fh.caclErrScore(from.ID())
toScore := fh.caclErrScore(to.ID())
snapshot := fh.snapshot[from.ID()]
var fields []zap.Field
if snapshot.indicator != "" {
fields = append(fields,
Expand Down
34 changes: 34 additions & 0 deletions pkg/balance/factor/factor_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,40 @@ func TestHealthBalanceCount(t *testing.T) {
}
}

func TestHealthSnapshotUsesBackendID(t *testing.T) {
backends := []scoredBackend{
createBackendWithAddrID("shared:4000", "cluster-a/shared:4000", "10.0.0.1", 10080, 10, 10),
createBackendWithAddrID("shared:4000", "cluster-b/shared:4000", "10.0.0.2", 10080, 20, 20),
}
mmr := &mockMetricsReader{
qrs: map[string]metricsreader.QueryResult{
"failure_pd": {
UpdateTime: time.Now(),
Value: model.Vector([]*model.Sample{
createSampleForInstance(0, "10.0.0.1:10080"),
createSampleForInstance(100, "10.0.0.2:10080"),
}),
},
"total_pd": {
UpdateTime: time.Now(),
Value: model.Vector([]*model.Sample{
createSampleForInstance(100, "10.0.0.1:10080"),
createSampleForInstance(100, "10.0.0.2:10080"),
}),
},
},
}
fh := NewFactorHealth(mmr, zap.NewNop())

fh.UpdateScore(backends)

require.Len(t, fh.snapshot, 2)
require.Contains(t, fh.snapshot, "cluster-a/shared:4000")
require.Contains(t, fh.snapshot, "cluster-b/shared:4000")
require.Equal(t, valueRangeNormal, fh.snapshot["cluster-a/shared:4000"].valueRange)
require.Equal(t, valueRangeAbnormal, fh.snapshot["cluster-b/shared:4000"].valueRange)
}

func TestHealthQueryRule(t *testing.T) {
tests := []struct {
text string
Expand Down
19 changes: 10 additions & 9 deletions pkg/balance/factor/factor_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ type FactorMemory struct {
snapshot map[string]memBackendSnapshot
// The updated time of the metric that we've read last time.
lastMetricTime time.Time
mr metricsreader.MetricsReader
mr metricsreader.MetricsQuerier
bitNum int
migrationsPerSecond float64
lg *zap.Logger
}

func NewFactorMemory(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorMemory {
func NewFactorMemory(mr metricsreader.MetricsQuerier, lg *zap.Logger) *FactorMemory {
bitNum := 0
for levels := len(oomRiskLevels); ; bitNum++ {
if levels == 0 {
Expand Down Expand Up @@ -155,9 +155,9 @@ func (fm *FactorMemory) UpdateScore(backends []scoredBackend) {
}

for i := range backends {
addr := backends[i].Addr()
key := backends[i].ID()
// If the backend is new or the backend misses metrics, take it safe.
score := fm.snapshot[addr].riskLevel
score := fm.snapshot[key].riskLevel
backends[i].addScore(score, fm.bitNum)
}
}
Expand All @@ -170,7 +170,8 @@ func (fm *FactorMemory) updateSnapshot(qr metricsreader.QueryResult, backends []
now := time.Now()
for _, backend := range backends {
addr := backend.Addr()
snapshot := fm.snapshot[addr]
key := backend.ID()
snapshot := fm.snapshot[key]
// If a backend exists in metrics but not in the backend list, ignore it for this round.
// The backend will be in the next round if it's healthy.
pairs := qr.GetSamplePair4Backend(backend)
Expand Down Expand Up @@ -202,7 +203,7 @@ func (fm *FactorMemory) updateSnapshot(qr metricsreader.QueryResult, backends []
zap.Float64("balance_count", balanceCount),
zap.Int("conn_score", backend.ConnScore()))
}
fm.snapshot[addr] = memBackendSnapshot{
fm.snapshot[key] = memBackendSnapshot{
updatedTime: updateTime,
memUsage: latestUsage,
timeToOOM: timeToOOM,
Expand Down Expand Up @@ -265,7 +266,7 @@ func (fm *FactorMemory) calcBalanceCount(backend scoredBackend, riskLevel int, t
}
balanceCount := float64(backend.ConnScore()) / seconds
// If the migration started eariler, reuse the balance count.
if snapshot := fm.snapshot[backend.Addr()]; snapshot.balanceCount > balanceCount {
if snapshot := fm.snapshot[backend.ID()]; snapshot.balanceCount > balanceCount {
return snapshot.balanceCount
}
return balanceCount
Expand All @@ -279,8 +280,8 @@ func (fm *FactorMemory) BalanceCount(from, to scoredBackend) (BalanceAdvice, flo
// The risk level may change frequently, e.g. last time timeToOOM was 30s and connections were migrated away,
// then this time it becomes 60s and the connections are migrated back.
// So we only rebalance when the difference of risk levels of 2 backends is big enough.
fromSnapshot := fm.snapshot[from.Addr()]
toSnapshot := fm.snapshot[to.Addr()]
fromSnapshot := fm.snapshot[from.ID()]
toSnapshot := fm.snapshot[to.ID()]
fields := []zap.Field{
zap.Duration("from_time_to_oom", fromSnapshot.timeToOOM),
zap.Float64("from_mem_usage", fromSnapshot.memUsage),
Expand Down
28 changes: 28 additions & 0 deletions pkg/balance/factor/factor_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,34 @@ func TestMemoryBalanceCount(t *testing.T) {
}
}

func TestMemorySnapshotUsesBackendID(t *testing.T) {
now := model.Now()
backends := []scoredBackend{
createBackendWithAddrID("shared:4000", "cluster-a/shared:4000", "10.0.0.1", 10080, 10, 10),
createBackendWithAddrID("shared:4000", "cluster-b/shared:4000", "10.0.0.2", 10080, 20, 20),
}
mmr := &mockMetricsReader{
qrs: map[string]metricsreader.QueryResult{
"memory": {
UpdateTime: time.Now(),
Value: model.Matrix([]*model.SampleStream{
createSampleStreamForInstance([]float64{0.2, 0.2}, "10.0.0.1:10080", now),
createSampleStreamForInstance([]float64{0.85, 0.85}, "10.0.0.2:10080", now),
}),
},
},
}
fm := NewFactorMemory(mmr, zap.NewNop())

fm.UpdateScore(backends)

require.Len(t, fm.snapshot, 2)
require.Contains(t, fm.snapshot, "cluster-a/shared:4000")
require.Contains(t, fm.snapshot, "cluster-b/shared:4000")
require.Equal(t, 0.2, fm.snapshot["cluster-a/shared:4000"].memUsage)
require.Equal(t, 0.85, fm.snapshot["cluster-b/shared:4000"].memUsage)
}

func TestMemoryQueryRule(t *testing.T) {
tests := []struct {
text string
Expand Down
Loading