Skip to content

Commit 4b0c751

Browse files
committed
try to skip downloading metadata when calculate shift ts
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
1 parent dae8762 commit 4b0c751

7 files changed

Lines changed: 91 additions & 27 deletions

File tree

br/pkg/restore/log_client/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ go_library(
3737
"//br/pkg/restore/tiflashrec",
3838
"//br/pkg/restore/utils",
3939
"//br/pkg/stream",
40+
"//br/pkg/stream/backupmetas",
4041
"//br/pkg/summary",
4142
"//br/pkg/utils",
4243
"//br/pkg/utils/consts",

br/pkg/restore/log_client/log_file_manager.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/pingcap/tidb/br/pkg/encryption"
2020
berrors "github.com/pingcap/tidb/br/pkg/errors"
2121
"github.com/pingcap/tidb/br/pkg/stream"
22+
"github.com/pingcap/tidb/br/pkg/stream/backupmetas"
2223
"github.com/pingcap/tidb/br/pkg/utils"
2324
"github.com/pingcap/tidb/br/pkg/utils/consts"
2425
"github.com/pingcap/tidb/br/pkg/utils/iter"
@@ -180,15 +181,37 @@ func (lm *LogFileManager) loadShiftTS(ctx context.Context) error {
180181
// use start ts to calculate shift start ts
181182
lm.startTS,
182183
lm.restoreTS,
183-
lm.metadataDownloadBatchSize, func(path string, raw []byte) error {
184+
lm.metadataDownloadBatchSize,
185+
func(filename string) bool {
186+
parsedName, err := stream.TryParseTaggedBackupMetaFileNameWrapper(filename)
187+
if err != nil {
188+
return false
189+
}
190+
ts, status := parsedName.CalculateShiftTS(lm.startTS, lm.restoreTS)
191+
switch status {
192+
case backupmetas.ShiftTSFound:
193+
case backupmetas.ShiftTSNotFound:
194+
return true
195+
default:
196+
return false
197+
}
198+
shiftTS.Lock()
199+
if !shiftTS.exists || shiftTS.value > ts {
200+
shiftTS.value = ts
201+
shiftTS.exists = true
202+
}
203+
shiftTS.Unlock()
204+
return true
205+
},
206+
func(filename string, raw []byte) error {
184207
m, err := lm.helper.ParseToMetadata(raw)
185208
if err != nil {
186209
return err
187210
}
188-
log.Info("read meta from storage and parse", zap.String("path", path), zap.Uint64("min-ts", m.MinTs),
211+
log.Info("read meta from storage and parse", zap.String("path", filename), zap.Uint64("min-ts", m.MinTs),
189212
zap.Uint64("max-ts", m.MaxTs), zap.Int32("meta-version", int32(m.MetaVersion)))
190213

191-
ts, ok := stream.UpdateShiftTS(m, lm.startTS, lm.restoreTS)
214+
ts, ok := stream.UpdateShiftTSFromMetadata(m, lm.startTS, lm.restoreTS)
192215
shiftTS.Lock()
193216
if ok && (!shiftTS.exists || shiftTS.value > ts) {
194217
shiftTS.value = ts
@@ -206,7 +229,7 @@ func (lm *LogFileManager) loadShiftTS(ctx context.Context) error {
206229
lm.withMigrationBuilder.SetShiftStartTS(lm.shiftStartTS)
207230
return nil
208231
}
209-
lm.shiftStartTS = shiftTS.value
232+
lm.shiftStartTS = min(lm.startTS, shiftTS.value)
210233
lm.withMigrationBuilder.SetShiftStartTS(lm.shiftStartTS)
211234
return nil
212235
}
@@ -332,18 +355,6 @@ func (lm *LogFileManager) LoadDDLFiles(ctx context.Context) ([]Log, error) {
332355
return lm.collectDDLFilesAndPrepareCache(ctx, mg)
333356
}
334357

335-
type loadDMLFilesConfig struct {
336-
Statistic *logFilesStatistic
337-
}
338-
339-
type loadDMLFilesOption func(*loadDMLFilesConfig)
340-
341-
func lDOptWithStatistics(s *logFilesStatistic) loadDMLFilesOption {
342-
return func(c *loadDMLFilesConfig) {
343-
c.Statistic = s
344-
}
345-
}
346-
347358
// LoadDMLFiles loads all DML files needs to be restored in the restoration.
348359
// This function returns a stream, because there are usually many DML files need to be restored.
349360
func (lm *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error) {

br/pkg/stream/backupmetas/parser.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ type ParsedName struct {
4848
MaxTS uint64
4949
}
5050

51+
type ShiftTSStatus uint8
52+
53+
const (
54+
ShiftTSFound ShiftTSStatus = iota
55+
ShiftTSNotFound
56+
ShiftTSInvalidStats
57+
)
58+
5159
func ParseName(fileName string) (ParsedName, error) {
5260
switch {
5361
case taggedBackupMetaPattern.MatchString(fileName):
@@ -59,6 +67,24 @@ func ParseName(fileName string) (ParsedName, error) {
5967
}
6068
}
6169

70+
func (parsedName *ParsedName) CalculateShiftTS(startTS uint64, restoreTS uint64) (uint64, ShiftTSStatus) {
71+
if parsedName.MinTS > restoreTS || parsedName.MaxTS < startTS {
72+
return 0, ShiftTSNotFound
73+
}
74+
if parsedName.MinBeginTsInDefaultCf == 0 || parsedName.MinBeginTsInDefaultCf > parsedName.MinTS {
75+
return 0, ShiftTSInvalidStats
76+
}
77+
return parsedName.MinBeginTsInDefaultCf, ShiftTSFound
78+
}
79+
80+
// TryParseTaggedBackupMetaFileName parses the tagged backupmeta file-name format.
81+
func TryParseTaggedBackupMetaFileName(fileName string) (ParsedName, error) {
82+
if !taggedBackupMetaPattern.MatchString(fileName) {
83+
return ParsedName{}, errors.Errorf("invalid latest backupmeta file name format: %s", fileName)
84+
}
85+
return parseTaggedBackupMetaFileName(fileName)
86+
}
87+
6288
func parseLegacyBackupMetaFileName(fileName string) (ParsedName, error) {
6389
parts := strings.Split(fileName, "-")
6490
if len(parts) != legacyBackupMetaPartCount {

br/pkg/stream/export_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ package stream
1616

1717
import (
1818
"context"
19+
"math"
1920

2021
backuppb "github.com/pingcap/kvproto/pkg/brpb"
22+
"github.com/pingcap/tidb/pkg/objstore/storeapi"
2123
)
2224

2325
func (s *StreamBackupSearch) SearchFromDataFileForTest(
@@ -33,3 +35,9 @@ func (s *StreamBackupSearch) MergeCFEntriesForTest(
3335
) []*StreamKVInfo {
3436
return s.mergeCFEntries(defaultCFEntries, writeCFEntries)
3537
}
38+
39+
// LoadFrom loads data from an external storage into the stream metadata set. (Now only for test)
40+
func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storeapi.Storage) error {
41+
_, err := ms.LoadUntilAndCalculateShiftTS(ctx, s, math.MaxUint64)
42+
return err
43+
}

br/pkg/stream/stream_metas.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
berrors "github.com/pingcap/tidb/br/pkg/errors"
2929
"github.com/pingcap/tidb/br/pkg/glue"
3030
"github.com/pingcap/tidb/br/pkg/logutil"
31+
"github.com/pingcap/tidb/br/pkg/stream/backupmetas"
3132
"github.com/pingcap/tidb/br/pkg/utils/consts"
3233
"github.com/pingcap/tidb/br/pkg/utils/iter"
3334
"github.com/pingcap/tidb/pkg/objstore"
@@ -112,7 +113,8 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(
112113
err := FastUnmarshalMetaData(ctx, s,
113114
0,
114115
until,
115-
ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
116+
ms.MetadataDownloadBatchSize,
117+
func(string) bool { return false }, func(filename string, raw []byte) error {
116118
m, err := ms.Helper.ParseToMetadataHard(raw)
117119
if err != nil {
118120
return err
@@ -136,15 +138,15 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(
136138
})
137139
}
138140
metadataMap.Lock()
139-
metadataMap.metas[path] = &MetadataInfo{
141+
metadataMap.metas[filename] = &MetadataInfo{
140142
MinTS: m.MinTs,
141143
FileGroupInfos: fileGroupInfos,
142144
}
143145
metadataMap.Unlock()
144146
}
145147
// filter out the metadatas whose ts-range is overlap with [until, +inf)
146148
// and calculate their minimum begin-default-ts
147-
ts, ok := UpdateShiftTS(m, until, mathutil.MaxUint)
149+
ts, ok := UpdateShiftTS(filename, m, until, mathutil.MaxUint)
148150
if ok {
149151
metadataMap.Lock()
150152
if ts < metadataMap.shiftUntilTS {
@@ -164,12 +166,6 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(
164166
return metadataMap.shiftUntilTS, nil
165167
}
166168

167-
// LoadFrom loads data from an external storage into the stream metadata set. (Now only for test)
168-
func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storeapi.Storage) error {
169-
_, err := ms.LoadUntilAndCalculateShiftTS(ctx, s, math.MaxUint64)
170-
return err
171-
}
172-
173169
func (ms *StreamMetadataSet) iterateDataFiles(f func(d *FileGroupInfo) (shouldBreak bool)) {
174170
for _, m := range ms.metadataInfos {
175171
if slices.ContainsFunc(m.FileGroupInfos, f) {
@@ -295,7 +291,26 @@ func SetTSToFile(
295291
return truncateAndWrite(ctx, s, filename, []byte(content))
296292
}
297293

298-
func UpdateShiftTS(m *pb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool) {
294+
func TryParseTaggedBackupMetaFileNameWrapper(filename string) (backupmetas.ParsedName, error) {
295+
baseName := strings.TrimSuffix(path.Base(filename), metaSuffix)
296+
return backupmetas.TryParseTaggedBackupMetaFileName(baseName)
297+
}
298+
299+
func UpdateShiftTS(filename string, m *pb.Metadata, startTS, restoreTS uint64) (uint64, bool) {
300+
parsedName, err := TryParseTaggedBackupMetaFileNameWrapper(filename)
301+
if err == nil {
302+
ts, status := parsedName.CalculateShiftTS(startTS, restoreTS)
303+
switch status {
304+
case backupmetas.ShiftTSFound:
305+
return ts, true
306+
case backupmetas.ShiftTSNotFound:
307+
return 0, false
308+
}
309+
}
310+
return UpdateShiftTSFromMetadata(m, startTS, restoreTS)
311+
}
312+
313+
func UpdateShiftTSFromMetadata(m *pb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool) {
299314
var (
300315
minBeginTS uint64
301316
isExist bool

br/pkg/stream/stream_mgr.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ func FastUnmarshalMetaData(
404404
startTS uint64,
405405
endTS uint64,
406406
metaDataWorkerPoolSize uint,
407+
skipCondition func(filename string) bool,
407408
fn func(path string, rawMetaData []byte) error,
408409
) error {
409410
log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize))
@@ -423,6 +424,9 @@ func FastUnmarshalMetaData(
423424
)
424425
return nil
425426
}
427+
if skipCondition(path) {
428+
return nil
429+
}
426430
pool.ApplyOnErrorGroup(eg, func() error {
427431
b, err := s.ReadFile(ectx, readPath)
428432
if err != nil {

br/pkg/utils/iter/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_library(
1515
"//pkg/util",
1616
"@com_github_prometheus_client_golang//prometheus",
1717
"@org_golang_x_exp//constraints",
18-
"@org_golang_x_sync//errgroup",
1918
],
2019
)
2120

0 commit comments

Comments
 (0)