Skip to content

Commit f7c4f68

Browse files
authored
globalsort: split simplesst out of globalsort (#68479)
ref #68346
1 parent f3ea6d7 commit f7c4f68

64 files changed

Lines changed: 1922 additions & 1640 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

pkg/ddl/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ go_library(
114114
"//pkg/ingestor/engineapi",
115115
"//pkg/ingestor/globalsort",
116116
"//pkg/ingestor/ingestctrl",
117+
"//pkg/ingestor/simplesst",
117118
"//pkg/keyspace",
118119
"//pkg/kv",
119120
"//pkg/lightning/backend",
@@ -344,6 +345,7 @@ go_test(
344345
"//pkg/infoschema",
345346
"//pkg/infoschema/validatorapi",
346347
"//pkg/ingestor/globalsort",
348+
"//pkg/ingestor/simplesst",
347349
"//pkg/keyspace",
348350
"//pkg/kv",
349351
"//pkg/meta",

pkg/ddl/backfilling_dist_scheduler.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
diststorage "github.com/pingcap/tidb/pkg/dxf/framework/storage"
3838
"github.com/pingcap/tidb/pkg/ingestor/globalsort"
3939
"github.com/pingcap/tidb/pkg/ingestor/ingestctrl"
40+
"github.com/pingcap/tidb/pkg/ingestor/simplesst"
4041
"github.com/pingcap/tidb/pkg/kv"
4142
"github.com/pingcap/tidb/pkg/meta"
4243
"github.com/pingcap/tidb/pkg/meta/model"
@@ -239,11 +240,11 @@ func (sch *LitBackfillScheduler) GetNextStep(task *proto.TaskBase) proto.Step {
239240
}
240241
}
241242

242-
func skipMergeSort(stats []globalsort.MultipleFilesStat, concurrency int) bool {
243+
func skipMergeSort(stats []simplesst.MultipleFilesStat, concurrency int) bool {
243244
failpoint.Inject("forceMergeSort", func() {
244245
failpoint.Return(false)
245246
})
246-
return globalsort.GetMaxOverlappingTotal(stats) <= globalsort.GetAdjustedMergeSortOverlapThreshold(concurrency)
247+
return simplesst.GetMaxOverlappingTotal(stats) <= simplesst.GetAdjustedMergeSortOverlapThreshold(concurrency)
247248
}
248249

249250
// OnDone implements scheduler.Extension interface.
@@ -642,7 +643,7 @@ func generateMergeSortPlan(
642643
// check data files overlaps,
643644
// if data files overlaps too much, we need a merge step.
644645
var (
645-
multiStatsGroup [][]globalsort.MultipleFilesStat
646+
multiStatsGroup [][]simplesst.MultipleFilesStat
646647
kvMetaGroups []*globalsort.SortedKVMeta
647648
eleIDs []int64
648649
)
@@ -657,13 +658,13 @@ func generateMergeSortPlan(
657658
func(subtask *BackfillSubTaskMeta) {
658659
if kvMetaGroups == nil {
659660
kvMetaGroups = make([]*globalsort.SortedKVMeta, len(subtask.MetaGroups))
660-
multiStatsGroup = make([][]globalsort.MultipleFilesStat, len(subtask.MetaGroups))
661+
multiStatsGroup = make([][]simplesst.MultipleFilesStat, len(subtask.MetaGroups))
661662
eleIDs = subtask.EleIDs
662663
}
663664
for i, g := range subtask.MetaGroups {
664665
if kvMetaGroups[i] == nil {
665666
kvMetaGroups[i] = &globalsort.SortedKVMeta{}
666-
multiStatsGroup[i] = make([]globalsort.MultipleFilesStat, 0, 100)
667+
multiStatsGroup[i] = make([]simplesst.MultipleFilesStat, 0, 100)
667668
}
668669
kvMetaGroups[i].Merge(g)
669670
multiStatsGroup[i] = append(multiStatsGroup[i], g.MultipleFilesStats...)
@@ -743,7 +744,7 @@ func getRangeSplitter(
743744
cloudStorageURI string,
744745
totalSize int64,
745746
instanceCnt int64,
746-
multiFileStat []globalsort.MultipleFilesStat,
747+
multiFileStat []simplesst.MultipleFilesStat,
747748
logger *zap.Logger,
748749
) (*globalsort.RangeSplitter, error) {
749750
backend, err := objstore.ParseBackend(cloudStorageURI, nil)

pkg/ddl/backfilling_dist_scheduler_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/pingcap/tidb/pkg/dxf/framework/scheduler"
3232
"github.com/pingcap/tidb/pkg/dxf/framework/storage"
3333
"github.com/pingcap/tidb/pkg/ingestor/globalsort"
34+
"github.com/pingcap/tidb/pkg/ingestor/simplesst"
3435
"github.com/pingcap/tidb/pkg/keyspace"
3536
"github.com/pingcap/tidb/pkg/meta"
3637
"github.com/pingcap/tidb/pkg/meta/model"
@@ -201,7 +202,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
201202
StartKey: []byte("ta"),
202203
EndKey: []byte("tc"),
203204
TotalKVSize: 12,
204-
MultipleFilesStats: []globalsort.MultipleFilesStat{
205+
MultipleFilesStats: []simplesst.MultipleFilesStat{
205206
{
206207
Filenames: [][2]string{
207208
{"gs://sort-bucket/data/1", "gs://sort-bucket/data/1.stat"},
@@ -241,7 +242,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
241242
StartKey: []byte("ta"),
242243
EndKey: []byte("tc"),
243244
TotalKVSize: 12,
244-
MultipleFilesStats: []globalsort.MultipleFilesStat{
245+
MultipleFilesStats: []simplesst.MultipleFilesStat{
245246
{
246247
Filenames: [][2]string{
247248
{"gs://sort-bucket/data/1", "gs://sort-bucket/data/1.stat"},

pkg/ddl/backfilling_merge_sort.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor/execute"
3232
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
3333
"github.com/pingcap/tidb/pkg/ingestor/globalsort"
34+
"github.com/pingcap/tidb/pkg/ingestor/simplesst"
3435
"github.com/pingcap/tidb/pkg/kv"
3536
"github.com/pingcap/tidb/pkg/meta/model"
3637
"github.com/pingcap/tidb/pkg/objstore/storeapi"
@@ -98,7 +99,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
9899
}
99100

100101
m.subtaskSortedKVMeta = &globalsort.SortedKVMeta{}
101-
onWriterClose := func(summary *globalsort.WriterSummary) {
102+
onWriterClose := func(summary *simplesst.WriterSummary) {
102103
m.mu.Lock()
103104
m.subtaskSortedKVMeta.MergeSummary(summary)
104105
m.mu.Unlock()
@@ -107,15 +108,15 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
107108
prefix := path.Join(strconv.Itoa(int(subtask.TaskID)), strconv.Itoa(int(subtask.ID)))
108109
res := m.GetResource()
109110
memSizePerCon := res.MemoryPerCore()
110-
partSize := max(globalsort.MinUploadPartSize, memSizePerCon*int64(globalsort.MaxMergingFilesPerThread)/globalsort.MaxUploadPartCount)
111+
partSize := max(simplesst.MinUploadPartSize, memSizePerCon*int64(globalsort.MaxMergingFilesPerThread)/simplesst.MaxUploadPartCount)
111112

112113
wctx := workerpool.NewContext(ctx)
113114
op := globalsort.NewMergeOperator(
114115
wctx,
115116
objStore,
116117
partSize,
117118
prefix,
118-
globalsort.DefaultBlockSize,
119+
simplesst.DefaultBlockSize,
119120
onWriterClose,
120121
globalsort.NewMergeCollector(ctx, nil),
121122
int(res.CPU.Capacity()),

pkg/ddl/backfilling_operators.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
"github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor/execute"
3737
"github.com/pingcap/tidb/pkg/dxf/operator"
3838
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
39-
"github.com/pingcap/tidb/pkg/ingestor/globalsort"
39+
"github.com/pingcap/tidb/pkg/ingestor/simplesst"
4040
"github.com/pingcap/tidb/pkg/kv"
4141
"github.com/pingcap/tidb/pkg/meta/model"
4242
"github.com/pingcap/tidb/pkg/metrics"
@@ -159,7 +159,7 @@ func NewWriteIndexToExternalStoragePipeline(
159159
tbl table.PhysicalTable,
160160
idxInfos []*model.IndexInfo,
161161
startKey, endKey kv.Key,
162-
onClose globalsort.OnWriterCloseFunc,
162+
onClose simplesst.OnWriterCloseFunc,
163163
reorgMeta *model.DDLReorgMeta,
164164
avgRowSize int,
165165
concurrency int,
@@ -647,7 +647,7 @@ func NewWriteExternalStoreOperator(
647647
store storeapi.Storage,
648648
srcChunkPool *sync.Pool,
649649
concurrency int,
650-
onClose globalsort.OnWriterCloseFunc,
650+
onClose simplesst.OnWriterCloseFunc,
651651
memoryQuota uint64,
652652
reorgMeta *model.DDLReorgMeta,
653653
tikvCodec tikv.Codec,
@@ -659,15 +659,15 @@ func NewWriteExternalStoreOperator(
659659
})
660660

661661
totalCount := new(atomic.Int64)
662-
blockSize := globalsort.GetAdjustedBlockSize(memoryQuota, globalsort.DefaultBlockSize)
662+
blockSize := simplesst.GetAdjustedBlockSize(memoryQuota, simplesst.DefaultBlockSize)
663663
pool := workerpool.NewWorkerPool(
664664
"WriteExternalStoreOperator",
665665
util.DDL,
666666
concurrency,
667667
func() workerpool.Worker[IndexRecordChunk, IndexWriteResult] {
668668
writers := make([]ingest.Writer, 0, len(indexes))
669669
for i := range indexes {
670-
builder := globalsort.NewWriterBuilder().
670+
builder := simplesst.NewWriterBuilder().
671671
SetOnCloseFunc(onClose).
672672
SetMemorySizeLimit(memoryQuota).
673673
SetTiKVCodec(tikvCodec).
@@ -879,7 +879,7 @@ func (w *indexIngestWorker) Close() error {
879879
var gerr error
880880

881881
for i, writer := range w.writers {
882-
ew, ok := writer.(*globalsort.Writer)
882+
ew, ok := writer.(*simplesst.Writer)
883883
if !ok {
884884
break
885885
}

pkg/ddl/backfilling_read_index.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/pingcap/tidb/pkg/dxf/operator"
3838
"github.com/pingcap/tidb/pkg/ingestor/globalsort"
3939
"github.com/pingcap/tidb/pkg/ingestor/ingestctrl"
40+
"github.com/pingcap/tidb/pkg/ingestor/simplesst"
4041
"github.com/pingcap/tidb/pkg/kv"
4142
lightningmetric "github.com/pingcap/tidb/pkg/lightning/metric"
4243
"github.com/pingcap/tidb/pkg/meta/model"
@@ -426,7 +427,7 @@ func (r *readIndexStepExecutor) buildExternalStorePipeline(
426427
return nil, err
427428
}
428429

429-
onWriterClose := func(summary *globalsort.WriterSummary) {
430+
onWriterClose := func(summary *simplesst.WriterSummary) {
430431
sum, _ := r.summaryMap.Load(subtaskID)
431432
s := sum.(*readIndexSummary)
432433
s.mu.Lock()

pkg/dxf/importinto/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ go_library(
4343
"//pkg/ingestor/engineapi",
4444
"//pkg/ingestor/globalsort",
4545
"//pkg/ingestor/ingestctrl",
46+
"//pkg/ingestor/simplesst",
4647
"//pkg/kv",
4748
"//pkg/lightning/backend",
4849
"//pkg/lightning/backend/kv",
@@ -128,6 +129,7 @@ go_test(
128129
"//pkg/executor/importer",
129130
"//pkg/ingestor/engineapi",
130131
"//pkg/ingestor/globalsort",
132+
"//pkg/ingestor/simplesst",
131133
"//pkg/keyspace",
132134
"//pkg/kv",
133135
"//pkg/lightning/backend",

pkg/dxf/importinto/conflict_resolution_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/pingcap/tidb/pkg/executor/importer"
2929
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
3030
"github.com/pingcap/tidb/pkg/ingestor/globalsort"
31+
"github.com/pingcap/tidb/pkg/ingestor/simplesst"
3132
tidbkv "github.com/pingcap/tidb/pkg/kv"
3233
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
3334
"github.com/pingcap/tidb/pkg/objstore/storeapi"
@@ -43,13 +44,13 @@ import (
4344
"go.uber.org/zap"
4445
)
4546

46-
func writeConflictKVFile(t *testing.T, codec tikv.Codec, kvGroup string, objStore storeapi.Storage, kvs []*globalsort.KVPair) *engineapi.ConflictInfo {
47+
func writeConflictKVFile(t *testing.T, codec tikv.Codec, kvGroup string, objStore storeapi.Storage, kvs []*simplesst.KVPair) *engineapi.ConflictInfo {
4748
t.Helper()
4849
ctx := context.Background()
49-
var summary *globalsort.WriterSummary
50-
w := globalsort.NewWriterBuilder().
50+
var summary *simplesst.WriterSummary
51+
w := simplesst.NewWriterBuilder().
5152
SetTiKVCodec(codec).
52-
SetOnCloseFunc(func(s *globalsort.WriterSummary) { summary = s }).
53+
SetOnCloseFunc(func(s *simplesst.WriterSummary) { summary = s }).
5354
Build(objStore, "/test", kvGroup)
5455
for _, kv := range kvs {
5556
require.NoError(t, w.WriteRow(ctx, kv.Key, kv.Value, nil))
@@ -78,22 +79,22 @@ func generateConflictKVFiles(t *testing.T, tempDir string, tbl table.Table, code
7879

7980
// total 3 * 2 conflicted data KVs, and 3 conflicted index KVs, they will be
8081
// taken as 9 conflicted rows.
81-
dupDataKVs := make([]*globalsort.KVPair, 0, 6)
82-
dupIndexKVs := make([]*globalsort.KVPair, 0, 3)
82+
dupDataKVs := make([]*simplesst.KVPair, 0, 6)
83+
dupIndexKVs := make([]*simplesst.KVPair, 0, 3)
8384
for i := range 3 {
8485
dupID := i + 1
8586
row := []types.Datum{types.NewDatum(dupID), types.NewDatum(dupID), types.NewDatum(dupID)}
8687
dupPairs, err2 := localEncoder.Encode(row, int64(dupID))
8788
require.NoError(t, err2)
8889
for _, pair := range dupPairs.Pairs {
8990
if tablecodec.IsRecordKey(pair.Key) {
90-
kv := &globalsort.KVPair{Key: pair.Key, Value: pair.Val}
91+
kv := &simplesst.KVPair{Key: pair.Key, Value: pair.Val}
9192
dupDataKVs = append(dupDataKVs, kv, kv)
9293
} else {
9394
indexID, err := tablecodec.DecodeIndexID(pair.Key)
9495
require.NoError(t, err)
9596
if indexID == 2 {
96-
kv := &globalsort.KVPair{Key: pair.Key, Value: pair.Val}
97+
kv := &simplesst.KVPair{Key: pair.Key, Value: pair.Val}
9798
dupIndexKVs = append(dupIndexKVs, kv)
9899
}
99100
}

pkg/dxf/importinto/conflictedkv/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ go_library(
1717
"//pkg/dxf/framework/taskexecutor/execute",
1818
"//pkg/executor/importer",
1919
"//pkg/ingestor/globalsort",
20+
"//pkg/ingestor/simplesst",
2021
"//pkg/kv",
2122
"//pkg/lightning/backend/kv",
2223
"//pkg/lightning/common",
@@ -57,6 +58,7 @@ go_test(
5758
"//pkg/dxf/framework/taskexecutor/execute",
5859
"//pkg/executor/importer",
5960
"//pkg/ingestor/globalsort",
61+
"//pkg/ingestor/simplesst",
6062
"//pkg/kv",
6163
"//pkg/lightning/backend/encode",
6264
"//pkg/lightning/backend/kv",

pkg/dxf/importinto/conflictedkv/collector.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor/execute"
2727
"github.com/pingcap/tidb/pkg/executor/importer"
2828
"github.com/pingcap/tidb/pkg/ingestor/globalsort"
29+
"github.com/pingcap/tidb/pkg/ingestor/simplesst"
2930
tidbkv "github.com/pingcap/tidb/pkg/kv"
3031
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
3132
"github.com/pingcap/tidb/pkg/lightning/common"
@@ -143,7 +144,7 @@ func NewCollector(
143144
}
144145

145146
// Run starts the collector to collect conflicted KV info.
146-
func (c *Collector) Run(ctx context.Context, ch chan *globalsort.KVPair) (err error) {
147+
func (c *Collector) Run(ctx context.Context, ch chan *simplesst.KVPair) (err error) {
147148
if err = c.handler.PreRun(); err != nil {
148149
return err
149150
}
@@ -224,7 +225,7 @@ func (c *Collector) switchFile(ctx context.Context) error {
224225
zap.String("lastFileSize", units.BytesSize(float64(c.currFileSize))))
225226
writer, err := c.store.Create(ctx, filename, &storeapi.WriterOption{
226227
Concurrency: 20,
227-
PartSize: globalsort.MinUploadPartSize,
228+
PartSize: simplesst.MinUploadPartSize,
228229
})
229230
if err != nil {
230231
return errors.Trace(err)

0 commit comments

Comments
 (0)