Skip to content

Commit 5634bd6

Browse files
authored
dxf: add DXF resource limit config (#68251)
ref #68181
1 parent 00a509d commit 5634bd6

23 files changed

Lines changed: 216 additions & 47 deletions

pkg/config/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ const (
9797
DefExpensiveTxnTimeThreshold = 600
9898
// DefMemoryUsageAlarmRatio is the threshold triggering an alarm which the memory usage of tidb-server instance exceeds.
9999
DefMemoryUsageAlarmRatio = 0.8
100+
// DefDXFResourceLimit is the default resource percentage available for DXF.
101+
DefDXFResourceLimit = 100
102+
// MinDXFResourceLimit is the minimum resource percentage available for DXF.
103+
// Keep it at 10 while mysql.dist_framework_meta.cpu_count changes from total
104+
// node CPU to usable DXF CPU, so the stored value stays non-zero.
105+
MinDXFResourceLimit = 10
106+
// MaxDXFResourceLimit is the maximum resource percentage available for DXF.
107+
MaxDXFResourceLimit = 100
100108
// DefTempDir is the default temporary directory path for TiDB.
101109
DefTempDir = "/tmp/tidb"
102110
// DefPluginAuditLogBufferSize is the default buffer size for plugin audit log.
@@ -201,6 +209,7 @@ type Config struct {
201209
TiDBEdition string `toml:"tidb-edition" json:"tidb-edition"`
202210
TiDBReleaseVersion string `toml:"tidb-release-version" json:"tidb-release-version"`
203211
DeployMode deploymode.Mode `toml:"deploy-mode" json:"deploy-mode"`
212+
DXFResourceLimit int `toml:"dxf-resource-limit" json:"dxf-resource-limit"`
204213
KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"`
205214
TiKVWorkerURL string `toml:"tikv-worker-url" json:"tikv-worker-url"`
206215
Log Log `toml:"log" json:"log"`
@@ -1041,6 +1050,7 @@ var defaultConf = Config{
10411050
VersionComment: "",
10421051
TiDBReleaseVersion: "",
10431052
DeployMode: deploymode.Premium,
1053+
DXFResourceLimit: DefDXFResourceLimit,
10441054
RUV2: DefaultRUV2Config(),
10451055
Log: Log{
10461056
Level: "info",
@@ -1385,6 +1395,13 @@ func (c *Config) Load(confFile string) error {
13851395
if !kerneltype.IsNextGen() && metaData.IsDefined("deploy-mode") {
13861396
return fmt.Errorf("deploy-mode can only be configured for nextgen TiDB")
13871397
}
1398+
dxfResourceLimitDefined := metaData.IsDefined("dxf-resource-limit")
1399+
if !dxfResourceLimitDefined && c.DXFResourceLimit == 0 {
1400+
c.DXFResourceLimit = DefDXFResourceLimit
1401+
}
1402+
if dxfResourceLimitDefined && c.DeployMode != deploymode.PremiumReserved {
1403+
return fmt.Errorf("dxf-resource-limit can only be configured when deploy-mode is premium_reserved")
1404+
}
13881405
if c.TokenLimit == 0 {
13891406
c.TokenLimit = 1000
13901407
} else if c.TokenLimit > MaxTokenLimit {
@@ -1456,6 +1473,12 @@ func (c *Config) Valid() error {
14561473
if !kerneltype.IsNextGen() && c.DeployMode != deploymode.Premium {
14571474
return fmt.Errorf("deploy-mode can only be configured for nextgen TiDB")
14581475
}
1476+
if c.DXFResourceLimit < MinDXFResourceLimit || c.DXFResourceLimit > MaxDXFResourceLimit {
1477+
return fmt.Errorf("dxf-resource-limit should be between %d and %d", MinDXFResourceLimit, MaxDXFResourceLimit)
1478+
}
1479+
if c.DXFResourceLimit != DefDXFResourceLimit && c.DeployMode != deploymode.PremiumReserved {
1480+
return fmt.Errorf("dxf-resource-limit can only be configured when deploy-mode is premium_reserved")
1481+
}
14591482
if c.Store == StoreTypeMockTiKV && !c.Instance.TiDBEnableDDL.Load() {
14601483
return fmt.Errorf("can't disable DDL on mocktikv")
14611484
}

pkg/config/config.toml.nextgen.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ store = "unistore"
1616
# startup and cannot be changed after it is set.
1717
deploy-mode = "premium"
1818

19+
# Limits the local TiDB resources available to DXF tasks, [10, 100].
20+
# This can only be configured when deploy-mode is "premium_reserved".
21+
# dxf-resource-limit = 100
22+
1923
# TiDB storage path.
2024
path = "/tmp/tidb"
2125

pkg/config/config_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,7 @@ func TestTxnTotalSizeLimitValid(t *testing.T) {
10521052
func TestDeployModeConfig(t *testing.T) {
10531053
conf := NewConfig()
10541054
require.Equal(t, deploymode.Premium, conf.DeployMode)
1055+
require.Equal(t, DefDXFResourceLimit, conf.DXFResourceLimit)
10551056
require.NoError(t, conf.Valid())
10561057
conf.DeployMode = deploymode.Mode(100)
10571058
require.ErrorContains(t, conf.Valid(), "invalid deploy-mode")
@@ -1061,6 +1062,10 @@ func TestDeployModeConfig(t *testing.T) {
10611062
configFile := filepath.Join(storeDir, "config.toml")
10621063

10631064
if kerneltype.IsClassic() {
1065+
require.NoError(t, os.WriteFile(configFile, []byte(`dxf-resource-limit = 30`), 0644))
1066+
conf = NewConfig()
1067+
require.ErrorContains(t, conf.Load(configFile), "dxf-resource-limit can only be configured when deploy-mode is premium_reserved")
1068+
10641069
require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "premium"`), 0644))
10651070
conf = NewConfig()
10661071
require.ErrorContains(t, conf.Load(configFile), "deploy-mode can only be configured for nextgen TiDB")
@@ -1076,8 +1081,34 @@ func TestDeployModeConfig(t *testing.T) {
10761081
conf = NewConfig()
10771082
require.NoError(t, conf.Load(configFile))
10781083
require.Equal(t, deploymode.PremiumReserved, conf.DeployMode)
1084+
require.Equal(t, DefDXFResourceLimit, conf.DXFResourceLimit)
1085+
require.NoError(t, conf.Valid())
1086+
1087+
require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "premium_reserved"
1088+
dxf-resource-limit = 30`), 0644))
1089+
conf = NewConfig()
1090+
require.NoError(t, conf.Load(configFile))
1091+
require.Equal(t, deploymode.PremiumReserved, conf.DeployMode)
1092+
require.Equal(t, 30, conf.DXFResourceLimit)
10791093
require.NoError(t, conf.Valid())
10801094

1095+
require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "premium"
1096+
dxf-resource-limit = 100`), 0644))
1097+
conf = NewConfig()
1098+
require.ErrorContains(t, conf.Load(configFile), "dxf-resource-limit can only be configured when deploy-mode is premium_reserved")
1099+
1100+
require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "premium_reserved"
1101+
dxf-resource-limit = 9`), 0644))
1102+
conf = NewConfig()
1103+
require.NoError(t, conf.Load(configFile))
1104+
require.ErrorContains(t, conf.Valid(), "dxf-resource-limit should be between 10 and 100")
1105+
1106+
require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "premium_reserved"
1107+
dxf-resource-limit = 101`), 0644))
1108+
conf = NewConfig()
1109+
require.NoError(t, conf.Load(configFile))
1110+
require.ErrorContains(t, conf.Valid(), "dxf-resource-limit should be between 10 and 100")
1111+
10811112
require.NoError(t, os.WriteFile(configFile, []byte(`deploy-mode = "starter"`), 0644))
10821113
conf = NewConfig()
10831114
require.NoError(t, conf.Load(configFile))

pkg/ddl/backfilling_dist_scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ func getRangeSplitter(
774774
logger.Warn("fail to get region split keys and size", zap.Error(err))
775775
}
776776
}
777-
nodeRc := handle.GetNodeResource()
777+
nodeRc := diststorage.GetNodeResource()
778778
rangeSize, rangeKeys := external.CalRangeSize(nodeRc.TotalMem/int64(nodeRc.TotalCPU), regionSplitSize, regionSplitKeys)
779779
logutil.DDLIngestLogger().Info("split kv range with split size and keys",
780780
zap.Int64("region-split-size", regionSplitSize),

pkg/domain/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ go_library(
3636
"//pkg/domain/infosync",
3737
"//pkg/domain/metrics",
3838
"//pkg/domain/sqlsvrapi",
39-
"//pkg/dxf/framework/handle",
4039
"//pkg/dxf/framework/metering",
4140
"//pkg/dxf/framework/proto",
4241
"//pkg/dxf/framework/scheduler",

pkg/domain/domain.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ import (
5050
"github.com/pingcap/tidb/pkg/domain/globalconfigsync"
5151
"github.com/pingcap/tidb/pkg/domain/infosync"
5252
"github.com/pingcap/tidb/pkg/domain/sqlsvrapi"
53-
disthandle "github.com/pingcap/tidb/pkg/dxf/framework/handle"
5453
"github.com/pingcap/tidb/pkg/dxf/framework/metering"
5554
"github.com/pingcap/tidb/pkg/dxf/framework/proto"
5655
"github.com/pingcap/tidb/pkg/dxf/framework/scheduler"
@@ -1127,7 +1126,7 @@ func (do *Domain) InitDistTaskLoop() error {
11271126
if err != nil {
11281127
return err
11291128
}
1130-
disthandle.SetNodeResource(nodeRes)
1129+
storage.SetNodeResource(nodeRes)
11311130
executorManager, err := taskexecutor.NewManager(managerCtx, do.store, serverID, taskManager, nodeRes)
11321131
if err != nil {
11331132
return err
@@ -1175,11 +1174,16 @@ func calculateNodeResource() (*proto.NodeResource, error) {
11751174
} else {
11761175
totalDisk = sz.Capacity
11771176
}
1177+
nodeRes := proto.NewNodeResource(totalCPU, int64(totalMem), totalDisk)
1178+
dxfNodeRes := nodeRes.LimitDXFResource(cfg.DXFResourceLimit)
11781179
logger.Info("initialize node resource",
11791180
zap.Int("total-cpu", totalCPU),
11801181
zap.String("total-mem", units.BytesSize(float64(totalMem))),
1182+
zap.Int("dxf-resource-limit", cfg.DXFResourceLimit),
1183+
zap.Int("dxf-usable-cpu", dxfNodeRes.TotalCPU),
1184+
zap.String("dxf-usable-mem", units.BytesSize(float64(dxfNodeRes.TotalMem))),
11811185
zap.String("total-disk", units.BytesSize(float64(totalDisk))))
1182-
return proto.NewNodeResource(totalCPU, int64(totalMem), totalDisk), nil
1186+
return dxfNodeRes, nil
11831187
}
11841188

11851189
func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storage.TaskManager, executorManager *taskexecutor.Manager, serverID string, nodeRes *proto.NodeResource) {

pkg/dxf/framework/handle/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ go_library(
2626
"//pkg/sessionctx",
2727
"//pkg/sessionctx/vardef",
2828
"//pkg/util/backoff",
29-
"//pkg/util/cpu",
3029
"//pkg/util/disttask",
3130
"//pkg/util/logutil",
3231
"//pkg/util/sem/compat",
@@ -35,7 +34,6 @@ go_library(
3534
"@com_github_pingcap_errors//:errors",
3635
"@com_github_pingcap_failpoint//:failpoint",
3736
"@com_github_tikv_client_go_v2//util",
38-
"@org_uber_go_atomic//:atomic",
3937
"@org_uber_go_zap//:zap",
4038
],
4139
)
@@ -56,6 +54,7 @@ go_test(
5654
"//pkg/dxf/framework/proto",
5755
"//pkg/dxf/framework/schstatus",
5856
"//pkg/dxf/framework/storage",
57+
"//pkg/dxf/framework/testutil",
5958
"//pkg/kv",
6059
"//pkg/meta",
6160
"//pkg/sessionctx",

pkg/dxf/framework/handle/handle.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import (
4646
"github.com/pingcap/tidb/pkg/util/sem/compat"
4747
"github.com/pingcap/tidb/pkg/util/sqlexec"
4848
"github.com/tikv/client-go/v2/util"
49-
"go.uber.org/atomic"
5049
"go.uber.org/zap"
5150
)
5251

@@ -292,18 +291,6 @@ func RunWithRetry(
292291
return lastErr
293292
}
294293

295-
var nodeResource atomic.Pointer[proto.NodeResource]
296-
297-
// GetNodeResource gets the node resource.
298-
func GetNodeResource() *proto.NodeResource {
299-
return nodeResource.Load()
300-
}
301-
302-
// SetNodeResource gets the node resource.
303-
func SetNodeResource(rc *proto.NodeResource) {
304-
nodeResource.Store(rc)
305-
}
306-
307294
// GetDefaultRegionSplitConfig gets the default region split size and keys.
308295
func GetDefaultRegionSplitConfig() (splitSize, splitKeys int64) {
309296
if kerneltype.IsNextGen() {
@@ -473,9 +460,3 @@ func SendRowAndSizeMeterData(ctx context.Context, task *proto.Task, rows int64,
473460
failpoint.InjectCall("afterSendRowAndSizeMeterData", item)
474461
return nil
475462
}
476-
477-
func init() {
478-
// domain will init this var at runtime, we store it here for test, as some
479-
// test might not start domain.
480-
nodeResource.Store(proto.NewNodeResource(8, 16*units.GiB, 100*units.GiB))
481-
}

pkg/dxf/framework/handle/status.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import (
2727
"github.com/pingcap/tidb/pkg/kv"
2828
"github.com/pingcap/tidb/pkg/meta/tidbvar"
2929
"github.com/pingcap/tidb/pkg/sessionctx"
30-
"github.com/pingcap/tidb/pkg/util/cpu"
3130
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
3231
"github.com/pingcap/tidb/pkg/util/logutil"
3332
"github.com/pingcap/tidb/pkg/util/sqlexec"
3433
"github.com/tikv/client-go/v2/util"
34+
"go.uber.org/zap"
3535
)
3636

3737
// GetScheduleStatus returns the schedule status.
@@ -115,8 +115,9 @@ func GetNodesInfo(ctx context.Context, manager *storage.TaskManager) (nodeCount
115115
if len(nodes) == 0 {
116116
// shouldn't happen normally as every node will register itself to the meta
117117
// table.
118-
logutil.BgLogger().Warn("no managed nodes found, use local node CPU count instead")
119-
cpuCount = cpu.GetCPUCount()
118+
cpuCount = storage.GetDXFCPUCount()
119+
logutil.BgLogger().Warn("no managed nodes found, use local usable CPU count instead",
120+
zap.Int("usableCPUCount", cpuCount))
120121
} else {
121122
cpuCount = nodes[0].CPUCount
122123
}

pkg/dxf/framework/handle/status_testkit_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/pingcap/tidb/pkg/dxf/framework/proto"
2727
"github.com/pingcap/tidb/pkg/dxf/framework/schstatus"
2828
"github.com/pingcap/tidb/pkg/dxf/framework/storage"
29+
"github.com/pingcap/tidb/pkg/dxf/framework/testutil"
2930
"github.com/pingcap/tidb/pkg/kv"
3031
"github.com/pingcap/tidb/pkg/meta"
3132
"github.com/pingcap/tidb/pkg/sessionctx"
@@ -59,7 +60,7 @@ func TestGetScheduleStatus(t *testing.T) {
5960

6061
func TestNodeInfoAndBusyNodes(t *testing.T) {
6162
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)")
62-
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)")
63+
testutil.MockNodeResource(t, 16)
6364
store := testkit.CreateMockStore(t)
6465
gtk := testkit.NewTestKit(t, store)
6566
pool := pools.NewResourcePool(func() (pools.Resource, error) {

0 commit comments

Comments
 (0)