Skip to content

Commit 6953d36

Browse files
authored
disttask: fix potential block on DXF scheduler (pingcap#59945)
close pingcap#59944
1 parent 91a622f commit 6953d36

File tree

3 files changed

+39
-1
lines changed

3 files changed

+39
-1
lines changed

pkg/disttask/framework/scheduler/scheduler_manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,10 @@ func (sm *Manager) startScheduler(basicTask *proto.TaskBase, allocateSlots bool,
359359
}()
360360
metrics.UpdateMetricsForRunTask(task)
361361
scheduler.ScheduleTask()
362-
sm.finishCh <- struct{}{}
362+
select {
363+
case sm.finishCh <- struct{}{}:
364+
default:
365+
}
363366
})
364367
}
365368

@@ -385,6 +388,7 @@ func (sm *Manager) cleanupTaskLoop() {
385388
//
386389
// tasks with global sort should clean up tmp files stored on S3.
387390
func (sm *Manager) doCleanupTask() {
391+
failpoint.InjectCall("doCleanupTask")
388392
tasks, err := sm.taskMgr.GetTasksInStates(
389393
sm.ctx,
390394
proto.TaskStateFailed,

tests/realtikvtest/addindextest1/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_test(
1111
deps = [
1212
"//pkg/config",
1313
"//pkg/ddl/ingest",
14+
"//pkg/disttask/framework/proto",
1415
"//pkg/disttask/framework/storage",
1516
"//pkg/disttask/framework/taskexecutor",
1617
"//pkg/errno",

tests/realtikvtest/addindextest1/disttask_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/pingcap/failpoint"
3131
"github.com/pingcap/tidb/pkg/config"
3232
"github.com/pingcap/tidb/pkg/ddl/ingest"
33+
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
3334
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
3435
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
3536
"github.com/pingcap/tidb/pkg/errno"
@@ -462,3 +463,35 @@ func TestAddIndexScheduleAway(t *testing.T) {
462463
tk.MustExec("alter table t add index idx(b);")
463464
require.NotEqual(t, int64(0), jobID.Load())
464465
}
466+
467+
func TestAddIndexDistCleanUpBlock(t *testing.T) {
468+
proto.MaxConcurrentTask = 1
469+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", `return(1)`)
470+
ch := make(chan struct{})
471+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/doCleanupTask", func() {
472+
<-ch
473+
})
474+
store := realtikvtest.CreateMockStoreAndSetup(t)
475+
if store.Name() != "TiKV" {
476+
t.Skip("TiKV store only")
477+
}
478+
479+
tk := testkit.NewTestKit(t, store)
480+
tk.MustExec("drop database if exists test;")
481+
tk.MustExec("create database test;")
482+
tk.MustExec("use test;")
483+
tk.MustExec(`set global tidb_enable_dist_task=1;`)
484+
var wg sync.WaitGroup
485+
for i := 0; i < 4; i++ {
486+
wg.Add(1)
487+
go func() {
488+
defer wg.Done()
489+
tk := testkit.NewTestKit(t, store)
490+
tk.MustExec(fmt.Sprintf("create table test.t%d (a int, b int);", i))
491+
tk.MustExec(fmt.Sprintf("insert into test.t%d values (1, 1);", i))
492+
tk.MustExec(fmt.Sprintf("alter table test.t%d add index idx(b);", i))
493+
}()
494+
}
495+
wg.Wait()
496+
close(ch)
497+
}

0 commit comments

Comments
 (0)