Skip to content

Commit 5fba0fa

Browse files
committed
global-sort: accelerate adding unique index using global sort
1 parent 0007a6c commit 5fba0fa

File tree

5 files changed

+110
-10
lines changed

5 files changed

+110
-10
lines changed

pkg/ddl/backfilling_dist_scheduler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,7 @@ func generateNonPartitionPlan(
295295
regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud)
296296

297297
for i := 0; i < len(recordRegionMetas); i += regionBatch {
298-
end := i + regionBatch
299-
if end > len(recordRegionMetas) {
300-
end = len(recordRegionMetas)
301-
}
298+
end := min(i+regionBatch, len(recordRegionMetas))
302299
batch := recordRegionMetas[i:end]
303300
subTaskMeta := &BackfillSubTaskMeta{
304301
RowStart: batch[0].StartKey(),
@@ -328,6 +325,9 @@ func generateNonPartitionPlan(
328325
}
329326

330327
func calculateRegionBatch(totalRegionCnt int, instanceCnt int, useLocalDisk bool) int {
328+
failpoint.Inject("mockRegionBatch", func(val failpoint.Value) {
329+
failpoint.Return(val.(int))
330+
})
331331
var regionBatch int
332332
avgTasksPerInstance := (totalRegionCnt + instanceCnt - 1) / instanceCnt // ceiling
333333
if useLocalDisk {

pkg/ddl/index.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,6 +1956,9 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
19561956
}
19571957
//nolint:forcetypeassert
19581958
discovery := w.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
1959+
if reorgInfo.ReorgMeta.UseCloudStorage {
1960+
return nil
1961+
}
19591962
return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo, discovery)
19601963
}
19611964
}

pkg/disttask/framework/taskexecutor/task_executor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ var (
6363
MockTiDBDown func(execID string, task *proto.TaskBase) bool
6464
)
6565

66+
var (
67+
// GetErrorSubtask4Test is used for UT to collect error
68+
GetErrorSubtask4Test atomic.Pointer[proto.TaskBase]
69+
)
70+
6671
// BaseTaskExecutor is the base implementation of TaskExecutor.
6772
type BaseTaskExecutor struct {
6873
// id, it's the same as server id now, i.e. host:port.
@@ -537,6 +542,9 @@ func (e *BaseTaskExecutor) onError(err error) {
537542
if err == nil {
538543
return
539544
}
545+
failpoint.Inject("collectTaskError", func() {
546+
GetErrorSubtask4Test.CompareAndSwap(nil, e.GetTaskBase())
547+
})
540548
err = errors.Trace(err)
541549
e.logger.Error("onError", zap.Error(err), zap.Stack("stack"))
542550
e.mu.Lock()

tests/realtikvtest/addindextest2/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,19 @@ go_test(
1111
deps = [
1212
"//br/pkg/storage",
1313
"//pkg/config",
14+
"//pkg/ddl",
1415
"//pkg/ddl/util/callback",
16+
"//pkg/disttask/framework/proto",
1517
"//pkg/disttask/framework/scheduler",
18+
"//pkg/disttask/framework/taskexecutor",
1619
"//pkg/kv",
1720
"//pkg/lightning/backend/external",
1821
"//pkg/parser/model",
1922
"//pkg/sessionctx/variable",
2023
"//pkg/store/helper",
2124
"//pkg/tablecodec",
2225
"//pkg/testkit",
26+
"//pkg/testkit/testfailpoint",
2327
"//pkg/types",
2428
"//tests/realtikvtest",
2529
"@com_github_fsouza_fake_gcs_server//fakestorage",

tests/realtikvtest/addindextest2/global_sort_test.go

Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"strconv"
2121
"strings"
22+
"sync/atomic"
2223
"testing"
2324
"time"
2425

@@ -27,15 +28,19 @@ import (
2728
"github.com/pingcap/failpoint"
2829
"github.com/pingcap/tidb/br/pkg/storage"
2930
"github.com/pingcap/tidb/pkg/config"
31+
"github.com/pingcap/tidb/pkg/ddl"
3032
"github.com/pingcap/tidb/pkg/ddl/util/callback"
33+
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
3134
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
35+
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
3236
"github.com/pingcap/tidb/pkg/kv"
3337
"github.com/pingcap/tidb/pkg/lightning/backend/external"
3438
"github.com/pingcap/tidb/pkg/parser/model"
3539
"github.com/pingcap/tidb/pkg/sessionctx/variable"
3640
"github.com/pingcap/tidb/pkg/store/helper"
3741
"github.com/pingcap/tidb/pkg/tablecodec"
3842
"github.com/pingcap/tidb/pkg/testkit"
43+
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
3944
"github.com/pingcap/tidb/pkg/types"
4045
"github.com/pingcap/tidb/tests/realtikvtest"
4146
"github.com/stretchr/testify/require"
@@ -261,16 +266,96 @@ func TestGlobalSortDuplicateErrMsg(t *testing.T) {
261266
tk.MustExec(`set @@global.tidb_ddl_enable_fast_reorg = 1;`)
262267
tk.MustExec("set @@global.tidb_enable_dist_task = 1;")
263268
tk.MustExec(fmt.Sprintf(`set @@global.tidb_cloud_storage_uri = "%s"`, cloudStorageURI))
264-
defer func() {
269+
atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1)
270+
tk.MustExec("set @@global.tidb_scatter_region = on")
271+
t.Cleanup(func() {
265272
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
266273
variable.CloudStorageURI.Store("")
267-
}()
274+
atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0)
275+
tk.MustExec("set @@global.tidb_scatter_region = default")
276+
})
277+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/collectTaskError", "return(true)")
278+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/mockRegionBatch", `return(1)`)
279+
280+
testcases := []struct {
281+
caseName string
282+
createTableSQL string
283+
splitTableSQL string
284+
initDataSQL string
285+
addUniqueKeySQL string
286+
errMsg string
287+
}{
288+
{
289+
"int index",
290+
"create table t (a int, b int, c int);",
291+
"",
292+
"insert into t values (1, 1, 1), (2, 1, 2);",
293+
"alter table t add unique index idx(b);",
294+
"Duplicate entry '1' for key 't.idx",
295+
},
296+
{
297+
"int index on multi regions",
298+
"create table t (a int primary key, b int);",
299+
"split table t between (0) and (4000) regions 4;",
300+
"insert into t values (1, 1), (1001, 1), (2001, 2001), (4001, 1);",
301+
"alter table t add unique index idx(b);",
302+
"[kv:1062]Duplicate entry '1' for key 't.idx'",
303+
},
304+
{
305+
"varchar index",
306+
"create table t (id int, data varchar(255));",
307+
"",
308+
"insert into t values (1, '1'), (2, '1');",
309+
"alter table t add unique index i(data);",
310+
"[kv:1062]Duplicate entry '1' for key 't.i'",
311+
},
312+
{
313+
"combined index",
314+
"create table t (id int, data varchar(255));",
315+
"",
316+
"insert into t values (1, '1'), (1, '1');",
317+
"alter table t add unique index i(id, data);",
318+
"[kv:1062]Duplicate entry '1-1' for key 't.i'",
319+
},
320+
{
321+
"multi value index",
322+
"create table t (id int, data json);",
323+
"",
324+
`insert into t values (1, '{"code":[1,1]}'), (2, '{"code":[1,1]}');`,
325+
"alter table t add unique index zips( (CAST(data->'$.code' AS UNSIGNED ARRAY)));",
326+
"Duplicate entry '1' for key 't.zips",
327+
},
328+
}
268329

269-
tk.MustExec("create table t (a int, b int, c int);")
270-
tk.MustExec("insert into t values (1, 1, 1);")
271-
tk.MustExec("insert into t values (2, 1, 2);")
330+
checkSubtaskErr := func(t *testing.T) {
331+
errorSubtask := taskexecutor.GetErrorSubtask4Test.Swap(nil)
332+
require.NotEmpty(t, errorSubtask)
333+
require.Equal(t, proto.BackfillStepWriteAndIngest, errorSubtask.Step)
334+
}
335+
336+
for _, tc := range testcases {
337+
t.Run(tc.caseName, func(tt *testing.T) {
338+
// init
339+
taskexecutor.GetErrorSubtask4Test.Store(nil)
340+
tk.MustExec(tc.createTableSQL)
341+
tk.MustExec(tc.initDataSQL)
342+
tt.Cleanup(func() {
343+
tk.MustExec("drop table if exists t")
344+
})
345+
346+
// pre-check
347+
if len(tc.splitTableSQL) > 0 {
348+
tk.MustQuery(tc.splitTableSQL).Check(testkit.Rows("3 1"))
349+
}
350+
if strings.Contains(tc.createTableSQL, "partition") {
351+
rs := tk.MustQuery("show table t regions")
352+
require.Len(tt, rs.Rows(), 2)
353+
}
272354

273-
tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'")
355+
tk.MustContainErrMsg(tc.addUniqueKeySQL, tc.errMsg)
356+
checkSubtaskErr(tt)
357+
})
358+
}
274359
}
275360

276361
func TestIngestUseGivenTS(t *testing.T) {

0 commit comments

Comments
 (0)