Skip to content

Commit b164c62

Browse files
committed
download schema log files concurrently
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
1 parent 4b0c751 commit b164c62

8 files changed

Lines changed: 216 additions & 30 deletions

File tree

br/pkg/restore/log_client/client.go

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ import (
8383

8484
const MetaKVBatchSize = 64 * 1024 * 1024
8585
const maxSplitKeysOnce = 10240
86+
const maxReadMetaKVFilesConcurrency uint = 128
8687

8788
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
8889
const rawKVBatchCount = 64
@@ -1121,15 +1122,15 @@ func SeparateAndSortFilesByCF(files []*backuppb.DataFileInfo) ([]*backuppb.DataF
11211122
// The error of transactions of meta could happen if restore write CF events successfully,
11221123
// but failed to restore default CF events.
11231124
for _, f := range files {
1124-
if f.Cf == consts.WriteCF {
1125-
filesInWriteCF = append(filesInWriteCF, f)
1126-
continue
1127-
}
1128-
if f.Type == backuppb.FileType_Delete {
1129-
log.Warn("internal error: detected delete file of meta key, skip it", zap.Any("file", f))
1125+
if !shouldReadMetaKVFile(f) {
1126+
if f.Type == backuppb.FileType_Delete {
1127+
log.Warn("internal error: detected delete file of meta key, skip it", zap.Any("file", f))
1128+
}
11301129
continue
11311130
}
1132-
if f.Cf == consts.DefaultCF {
1131+
if f.Cf == consts.WriteCF {
1132+
filesInWriteCF = append(filesInWriteCF, f)
1133+
} else {
11331134
filesInDefaultCF = append(filesInDefaultCF, f)
11341135
}
11351136
}
@@ -1140,6 +1141,26 @@ func SeparateAndSortFilesByCF(files []*backuppb.DataFileInfo) ([]*backuppb.DataF
11401141
return filesInDefaultCF, filesInWriteCF
11411142
}
11421143

1144+
func shouldReadMetaKVFile(file *backuppb.DataFileInfo) bool {
1145+
if file.Cf == consts.WriteCF {
1146+
return true
1147+
}
1148+
if file.Type == backuppb.FileType_Delete {
1149+
return false
1150+
}
1151+
return file.Cf == consts.DefaultCF
1152+
}
1153+
1154+
func countReadableMetaKVFiles(files []*backuppb.DataFileInfo) int {
1155+
count := 0
1156+
for _, file := range files {
1157+
if shouldReadMetaKVFile(file) {
1158+
count++
1159+
}
1160+
}
1161+
return count
1162+
}
1163+
11431164
// LoadAndProcessMetaKVFilesInBatch restores meta kv files to TiKV in strict TS order. It does so in batch and after
11441165
// success it triggers an update so every TiDB node can pick up the restored content.
11451166
func LoadAndProcessMetaKVFilesInBatch(
@@ -1290,14 +1311,28 @@ func (rc *LogClient) filterAndSortKvEntriesFromFiles(
12901311
}
12911312

12921313
// read all entries from files.
1293-
for _, f := range files {
1294-
es, filteredOutEs, err := rc.ReadFilteredEntriesFromFiles(ctx, f, filterTS)
1295-
if err != nil {
1314+
if len(files) > 0 {
1315+
eg, egCtx := errgroup.WithContext(ctx)
1316+
workerPool := tidbutil.NewWorkerPool(min(uint(len(files)), maxReadMetaKVFilesConcurrency), "read meta kv files")
1317+
var entriesLock sync.Mutex
1318+
for _, f := range files {
1319+
file := f
1320+
workerPool.ApplyOnErrorGroup(eg, func() error {
1321+
es, filteredOutEs, err := rc.ReadFilteredEntriesFromFiles(egCtx, file, filterTS)
1322+
if err != nil {
1323+
return errors.Trace(err)
1324+
}
1325+
1326+
entriesLock.Lock()
1327+
curKvEntries = append(curKvEntries, es...)
1328+
filteredOutKvEntries = append(filteredOutKvEntries, filteredOutEs...)
1329+
entriesLock.Unlock()
1330+
return nil
1331+
})
1332+
}
1333+
if err := eg.Wait(); err != nil {
12961334
return nil, nil, errors.Trace(err)
12971335
}
1298-
1299-
curKvEntries = append(curKvEntries, es...)
1300-
filteredOutKvEntries = append(filteredOutKvEntries, filteredOutEs...)
13011336
}
13021337

13031338
// sort these entries.

br/pkg/restore/log_client/client_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,34 @@ func TestSortMetaKVFiles(t *testing.T) {
772772
require.Equal(t, files[2].Path, "f3")
773773
require.Equal(t, files[3].Path, "f4")
774774
require.Equal(t, files[4].Path, "f5")
775+
776+
files = []*backuppb.DataFileInfo{
777+
{
778+
Path: "write-put",
779+
Cf: consts.WriteCF,
780+
Type: backuppb.FileType_Put,
781+
},
782+
{
783+
Path: "write-delete",
784+
Cf: consts.WriteCF,
785+
Type: backuppb.FileType_Delete,
786+
},
787+
{
788+
Path: "default-put",
789+
Cf: consts.DefaultCF,
790+
Type: backuppb.FileType_Put,
791+
},
792+
{
793+
Path: "default-delete",
794+
Cf: consts.DefaultCF,
795+
Type: backuppb.FileType_Delete,
796+
},
797+
}
798+
defaultFiles, writeFiles := logclient.SeparateAndSortFilesByCF(files)
799+
require.Len(t, defaultFiles, 1)
800+
require.Equal(t, "default-put", defaultFiles[0].Path)
801+
require.Len(t, writeFiles, 2)
802+
require.Equal(t, 3, logclient.TEST_CountReadableMetaKVFiles(files))
775803
}
776804

777805
func toLogDataFileInfoIter(logIter iter.TryNextor[*backuppb.DataFileInfo]) logclient.LogIter {

br/pkg/restore/log_client/export_test.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package logclient
1616

1717
import (
1818
"context"
19+
"sync/atomic"
1920

2021
"github.com/pingcap/errors"
2122
backuppb "github.com/pingcap/kvproto/pkg/brpb"
@@ -121,21 +122,48 @@ func TEST_NewLogFileManager(startTS, restoreTS, shiftStartTS uint64, helper stre
121122
}
122123
}
123124

125+
func TEST_CountReadableMetaKVFiles(files []*backuppb.DataFileInfo) int {
126+
return countReadableMetaKVFiles(files)
127+
}
128+
124129
type FakeStreamMetadataHelper struct {
125130
streamMetadataHelper
126131

127-
Data []byte
132+
Data []byte
133+
ReadGate <-chan struct{}
134+
active atomic.Int32
135+
maxActive atomic.Int32
136+
}
137+
138+
func (helper *FakeStreamMetadataHelper) ActiveReadCount() int32 {
139+
return helper.active.Load()
140+
}
141+
142+
func (helper *FakeStreamMetadataHelper) MaxActiveReadCount() int32 {
143+
return helper.maxActive.Load()
128144
}
129145

130146
func (helper *FakeStreamMetadataHelper) ReadFile(
131147
ctx context.Context,
132148
path string,
133149
offset uint64,
134150
length uint64,
151+
rawLength uint64,
135152
compressionType backuppb.CompressionType,
136153
storage storeapi.Storage,
137154
encryptionInfo *encryptionpb.FileEncryptionInfo,
138155
) ([]byte, error) {
156+
active := helper.active.Add(1)
157+
for {
158+
maxActive := helper.maxActive.Load()
159+
if active <= maxActive || helper.maxActive.CompareAndSwap(maxActive, active) {
160+
break
161+
}
162+
}
163+
defer helper.active.Add(-1)
164+
if helper.ReadGate != nil {
165+
<-helper.ReadGate
166+
}
139167
return helper.Data[offset : offset+length], nil
140168
}
141169

br/pkg/restore/log_client/log_file_manager.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type streamMetadataHelper interface {
8282
path string,
8383
offset uint64,
8484
length uint64,
85+
rawLength uint64,
8586
compressionType backuppb.CompressionType,
8687
storage storeapi.Storage,
8788
encryptionInfo *encryptionpb.FileEncryptionInfo,
@@ -336,7 +337,7 @@ func (lm *LogFileManager) collectDDLFilesAndPrepareCache(
336337

337338
dataFileInfos := make([]*backuppb.DataFileInfo, 0)
338339
for _, g := range fs.Item {
339-
lm.helper.InitCacheEntry(g.Path, len(g.FileMetas))
340+
lm.helper.InitCacheEntry(g.Path, countReadableMetaKVFiles(g.FileMetas))
340341
dataFileInfos = append(dataFileInfos, g.FileMetas...)
341342
}
342343

@@ -460,7 +461,7 @@ func (lm *LogFileManager) ReadFilteredEntriesFromFiles(
460461
kvEntries := make([]*KvEntryWithTS, 0)
461462
filteredOutKvEntries := make([]*KvEntryWithTS, 0)
462463

463-
buff, err := lm.helper.ReadFile(ctx, file.Path, file.RangeOffset, file.RangeLength, file.CompressionType,
464+
buff, err := lm.helper.ReadFile(ctx, file.Path, file.RangeOffset, file.RangeLength, file.Length, file.CompressionType,
464465
lm.storage, file.FileEncryptionInfo)
465466
if err != nil {
466467
return nil, nil, errors.Trace(err)

br/pkg/restore/log_client/log_file_manager_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"strings"
1717
"sync/atomic"
1818
"testing"
19+
"time"
1920

2021
"github.com/pingcap/errors"
2122
backuppb "github.com/pingcap/kvproto/pkg/brpb"
@@ -612,6 +613,7 @@ func generateKvData() ([]byte, logclient.Log) {
612613
Sha256: sha256[:],
613614
RangeOffset: rangeOffset,
614615
RangeLength: rangeLength,
616+
Length: rangeLength,
615617
}
616618
}
617619

@@ -646,4 +648,26 @@ func TestReadAllEntries(t *testing.T) {
646648
encodekvEntryWithTS("mDDL", 65),
647649
}, nextKvEntries)
648650
}
651+
{
652+
readGate := make(chan struct{})
653+
helper := &logclient.FakeStreamMetadataHelper{Data: data, ReadGate: readGate}
654+
fm := logclient.TEST_NewLogFileManager(35, 75, 25, helper)
655+
file.Cf = consts.DefaultCF
656+
errCh := make(chan error, 4)
657+
for range 4 {
658+
go func() {
659+
_, _, err := fm.ReadFilteredEntriesFromFiles(ctx, file, 50)
660+
errCh <- err
661+
}()
662+
}
663+
require.Eventually(t, func() bool {
664+
return helper.ActiveReadCount() == 4
665+
}, time.Second, 10*time.Millisecond)
666+
require.Equal(t, int32(4), helper.MaxActiveReadCount())
667+
close(readGate)
668+
for range 4 {
669+
require.NoError(t, <-errCh)
670+
}
671+
require.Equal(t, int32(4), helper.MaxActiveReadCount())
672+
}
649673
}

br/pkg/stream/stream_mgr.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"crypto/sha256"
2020
"encoding/hex"
2121
"strings"
22+
"sync"
2223

2324
"github.com/klauspost/compress/zstd"
2425
"github.com/pingcap/errors"
@@ -156,6 +157,7 @@ func BuildObserveMetaRange() *kv.KeyRange {
156157
}
157158

158159
type ContentRef struct {
160+
mu sync.Mutex
159161
init_ref int
160162
ref int
161163
data []byte
@@ -164,6 +166,7 @@ type ContentRef struct {
164166
// MetadataHelper make restore/truncate compatible with metadataV1 and metadataV2.
165167
type MetadataHelper struct {
166168
cache map[string]*ContentRef
169+
cacheMu sync.RWMutex
167170
decoder *zstd.Decoder
168171
encryptionManager *encryption.Manager
169172
}
@@ -194,19 +197,21 @@ func (m *MetadataHelper) InitCacheEntry(path string, ref int) {
194197
if ref <= 0 {
195198
return
196199
}
200+
m.cacheMu.Lock()
201+
defer m.cacheMu.Unlock()
197202
m.cache[path] = &ContentRef{
198203
init_ref: ref,
199204
ref: ref,
200205
data: nil,
201206
}
202207
}
203208

204-
func (m *MetadataHelper) decodeCompressedData(data []byte, compressionType backuppb.CompressionType) ([]byte, error) {
209+
func (m *MetadataHelper) decodeCompressedData(data []byte, rawLength uint64, compressionType backuppb.CompressionType) ([]byte, error) {
205210
switch compressionType {
206211
case backuppb.CompressionType_UNKNOWN:
207212
return data, nil
208213
case backuppb.CompressionType_ZSTD:
209-
return m.decoder.DecodeAll(data, nil)
214+
return m.decoder.DecodeAll(data, make([]byte, 0, rawLength))
210215
}
211216
return nil, errors.Errorf(
212217
"failed to decode compressed data: compression type is unimplemented. type id is %d", compressionType)
@@ -246,12 +251,15 @@ func (m *MetadataHelper) ReadFile(
246251
path string,
247252
offset uint64,
248253
length uint64,
254+
rawLength uint64,
249255
compressionType backuppb.CompressionType,
250256
storage storeapi.Storage,
251257
encryptionInfo *encryptionpb.FileEncryptionInfo,
252258
) ([]byte, error) {
253259
var err error
260+
m.cacheMu.RLock()
254261
cref, exist := m.cache[path]
262+
m.cacheMu.RUnlock()
255263
if !exist {
256264
// Only files from metaV2 are cached,
257265
// so the file should be from metaV1.
@@ -268,29 +276,33 @@ func (m *MetadataHelper) ReadFile(
268276
if err != nil {
269277
return nil, errors.Trace(err)
270278
}
271-
return m.decodeCompressedData(decryptedData, compressionType)
279+
return m.decodeCompressedData(decryptedData, rawLength, compressionType)
272280
}
273281

282+
cref.mu.Lock()
274283
cref.ref -= 1
275284

276285
if len(cref.data) == 0 {
277286
cref.data, err = storage.ReadFile(ctx, path)
278287
if err != nil {
288+
cref.mu.Unlock()
279289
return nil, errors.Trace(err)
280290
}
281291
}
282-
// decrypt if needed
283-
decryptedData, err := m.verifyChecksumAndDecryptIfNeeded(ctx, cref.data[offset:offset+length], encryptionInfo)
284-
if err != nil {
285-
return nil, errors.Trace(err)
286-
}
287-
buf, err := m.decodeCompressedData(decryptedData, compressionType)
288-
292+
data := cref.data[offset : offset+length]
289293
if cref.ref <= 0 {
290294
// need reset reference information.
291295
cref.data = nil
292296
cref.ref = cref.init_ref
293297
}
298+
cref.mu.Unlock()
299+
300+
// decrypt if needed
301+
decryptedData, err := m.verifyChecksumAndDecryptIfNeeded(ctx, data, encryptionInfo)
302+
if err != nil {
303+
return nil, errors.Trace(err)
304+
}
305+
buf, err := m.decodeCompressedData(decryptedData, rawLength, compressionType)
294306

295307
return buf, errors.Trace(err)
296308
}

0 commit comments

Comments
 (0)