Skip to content

Commit c60e6b4

Browse files
committed
global-sort: accelerate adding unique index using global sort
1 parent 0007a6c commit c60e6b4

File tree

5 files changed

+118
-10
lines changed

5 files changed

+118
-10
lines changed

pkg/ddl/backfilling_dist_scheduler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,7 @@ func generateNonPartitionPlan(
295295
regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud)
296296

297297
for i := 0; i < len(recordRegionMetas); i += regionBatch {
298-
end := i + regionBatch
299-
if end > len(recordRegionMetas) {
300-
end = len(recordRegionMetas)
301-
}
298+
end := min(i+regionBatch, len(recordRegionMetas))
302299
batch := recordRegionMetas[i:end]
303300
subTaskMeta := &BackfillSubTaskMeta{
304301
RowStart: batch[0].StartKey(),
@@ -328,6 +325,9 @@ func generateNonPartitionPlan(
328325
}
329326

330327
func calculateRegionBatch(totalRegionCnt int, instanceCnt int, useLocalDisk bool) int {
328+
failpoint.Inject("mockRegionBatch", func(val failpoint.Value) {
329+
failpoint.Return(val.(int))
330+
})
331331
var regionBatch int
332332
avgTasksPerInstance := (totalRegionCnt + instanceCnt - 1) / instanceCnt // ceiling
333333
if useLocalDisk {

pkg/ddl/index.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,6 +1956,9 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
19561956
}
19571957
//nolint:forcetypeassert
19581958
discovery := w.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
1959+
if reorgInfo.ReorgMeta.UseCloudStorage {
1960+
return nil
1961+
}
19591962
return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo, discovery)
19601963
}
19611964
}

pkg/disttask/framework/taskexecutor/task_executor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ var (
6363
MockTiDBDown func(execID string, task *proto.TaskBase) bool
6464
)
6565

66+
var (
67+
// GetErrorSubtask4Test is used for UT to collect error
68+
GetErrorSubtask4Test atomic.Pointer[proto.TaskBase]
69+
)
70+
6671
// BaseTaskExecutor is the base implementation of TaskExecutor.
6772
type BaseTaskExecutor struct {
6873
// id, it's the same as server id now, i.e. host:port.
@@ -537,6 +542,9 @@ func (e *BaseTaskExecutor) onError(err error) {
537542
if err == nil {
538543
return
539544
}
545+
failpoint.Inject("collectTaskError", func() {
546+
GetErrorSubtask4Test.CompareAndSwap(nil, e.GetTaskBase())
547+
})
540548
err = errors.Trace(err)
541549
e.logger.Error("onError", zap.Error(err), zap.Stack("stack"))
542550
e.mu.Lock()

tests/realtikvtest/addindextest2/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,19 @@ go_test(
1111
deps = [
1212
"//br/pkg/storage",
1313
"//pkg/config",
14+
"//pkg/ddl",
1415
"//pkg/ddl/util/callback",
16+
"//pkg/disttask/framework/proto",
1517
"//pkg/disttask/framework/scheduler",
18+
"//pkg/disttask/framework/taskexecutor",
1619
"//pkg/kv",
1720
"//pkg/lightning/backend/external",
1821
"//pkg/parser/model",
1922
"//pkg/sessionctx/variable",
2023
"//pkg/store/helper",
2124
"//pkg/tablecodec",
2225
"//pkg/testkit",
26+
"//pkg/testkit/testfailpoint",
2327
"//pkg/types",
2428
"//tests/realtikvtest",
2529
"@com_github_fsouza_fake_gcs_server//fakestorage",

tests/realtikvtest/addindextest2/global_sort_test.go

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"strconv"
2121
"strings"
22+
"sync/atomic"
2223
"testing"
2324
"time"
2425

@@ -27,15 +28,19 @@ import (
2728
"github.com/pingcap/failpoint"
2829
"github.com/pingcap/tidb/br/pkg/storage"
2930
"github.com/pingcap/tidb/pkg/config"
31+
"github.com/pingcap/tidb/pkg/ddl"
3032
"github.com/pingcap/tidb/pkg/ddl/util/callback"
33+
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
3134
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
35+
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
3236
"github.com/pingcap/tidb/pkg/kv"
3337
"github.com/pingcap/tidb/pkg/lightning/backend/external"
3438
"github.com/pingcap/tidb/pkg/parser/model"
3539
"github.com/pingcap/tidb/pkg/sessionctx/variable"
3640
"github.com/pingcap/tidb/pkg/store/helper"
3741
"github.com/pingcap/tidb/pkg/tablecodec"
3842
"github.com/pingcap/tidb/pkg/testkit"
43+
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
3944
"github.com/pingcap/tidb/pkg/types"
4045
"github.com/pingcap/tidb/tests/realtikvtest"
4146
"github.com/stretchr/testify/require"
@@ -261,16 +266,104 @@ func TestGlobalSortDuplicateErrMsg(t *testing.T) {
261266
tk.MustExec(`set @@global.tidb_ddl_enable_fast_reorg = 1;`)
262267
tk.MustExec("set @@global.tidb_enable_dist_task = 1;")
263268
tk.MustExec(fmt.Sprintf(`set @@global.tidb_cloud_storage_uri = "%s"`, cloudStorageURI))
264-
defer func() {
269+
atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1)
270+
tk.MustExec("set @@global.tidb_scatter_region = on")
271+
t.Cleanup(func() {
265272
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
266273
variable.CloudStorageURI.Store("")
267-
}()
274+
atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0)
275+
tk.MustExec("set @@global.tidb_scatter_region = default")
276+
})
277+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/collectTaskError", "return(true)")
278+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/mockRegionBatch", `return(1)`)
279+
280+
testcases := []struct {
281+
caseName string
282+
createTableSQL string
283+
splitTableSQL string
284+
initDataSQL string
285+
addUniqueKeySQL string
286+
errMsg string
287+
}{
288+
{
289+
"int index",
290+
"create table t (a int, b int, c int);",
291+
"",
292+
"insert into t values (1, 1, 1), (2, 1, 2);",
293+
"alter table t add unique index idx(b);",
294+
"Duplicate entry '1' for key 't.idx",
295+
},
296+
{
297+
"int index on multi regions",
298+
"create table t (a int primary key, b int);",
299+
"split table t between (0) and (4000) regions 4;",
300+
"insert into t values (1, 1), (1001, 1), (2001, 2001), (4001, 1);",
301+
"alter table t add unique index idx(b);",
302+
"[kv:1062]Duplicate entry '1' for key 't.idx'",
303+
},
304+
{
305+
"varchar index",
306+
"create table t (id int, data varchar(255));",
307+
"",
308+
"insert into t values (1, '1'), (2, '1');",
309+
"alter table t add unique index i(data);",
310+
"[kv:1062]Duplicate entry '1' for key 't.i'",
311+
},
312+
{
313+
"combined index",
314+
"create table t (id int, data varchar(255));",
315+
"",
316+
"insert into t values (1, '1'), (1, '1');",
317+
"alter table t add unique index i(id, data);",
318+
"[kv:1062]Duplicate entry '1-1' for key 't.i'",
319+
},
320+
{
321+
"multi value index",
322+
"create table t (id int, data json);",
323+
"",
324+
`insert into t values (1, '{"code":[1,1]}'), (2, '{"code":[1,1]}');`,
325+
"alter table t add unique index zips( (CAST(data->'$.code' AS UNSIGNED ARRAY)));",
326+
"Duplicate entry '1' for key 't.zips",
327+
},
328+
{
329+
"global index",
330+
"create table t (k int, c int) partition by list (k) (partition odd values in (1,3,5,7,9), partition even values in (2,4,6,8,10));",
331+
"",
332+
"insert into t values (1, 1), (2, 1)",
333+
"alter table t add unique index i(c) global",
334+
"[kv:1062]Duplicate entry '1' for key 't.i'",
335+
},
336+
}
268337

269-
tk.MustExec("create table t (a int, b int, c int);")
270-
tk.MustExec("insert into t values (1, 1, 1);")
271-
tk.MustExec("insert into t values (2, 1, 2);")
338+
checkSubtaskErr := func(t *testing.T) {
339+
errorSubtask := taskexecutor.GetErrorSubtask4Test.Swap(nil)
340+
require.NotEmpty(t, errorSubtask)
341+
require.Equal(t, proto.BackfillStepWriteAndIngest, errorSubtask.Step)
342+
}
343+
344+
for _, tc := range testcases {
345+
t.Run(tc.caseName, func(tt *testing.T) {
346+
// init
347+
taskexecutor.GetErrorSubtask4Test.Store(nil)
348+
tk.MustExec(tc.createTableSQL)
349+
tk.MustExec(tc.initDataSQL)
350+
tt.Cleanup(func() {
351+
tk.MustExec("drop table if exists t")
352+
})
353+
354+
// pre-check
355+
if len(tc.splitTableSQL) > 0 {
356+
tk.MustQuery(tc.splitTableSQL).Check(testkit.Rows("3 1"))
357+
}
358+
if strings.Contains(tc.createTableSQL, "partition") {
359+
rs := tk.MustQuery("show table t regions")
360+
require.Len(tt, rs.Rows(), 2)
361+
}
272362

273-
tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'")
363+
tk.MustContainErrMsg(tc.addUniqueKeySQL, tc.errMsg)
364+
checkSubtaskErr(tt)
365+
})
366+
}
274367
}
275368

276369
func TestIngestUseGivenTS(t *testing.T) {

0 commit comments

Comments
 (0)