Skip to content

Commit 67ce9ad

Browse files
committed
parse metadata name flags
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
1 parent b164c62 commit 67ce9ad

3 files changed

Lines changed: 62 additions & 7 deletions

File tree

br/pkg/restore/log_client/log_file_manager.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,11 @@ func (lm *LogFileManager) loadShiftTS(ctx context.Context) error {
236236
}
237237

238238
func (lm *LogFileManager) streamingMeta(ctx context.Context) (MetaNameIter, error) {
239-
return lm.streamingMetaByTS(ctx)
239+
return lm.streamingMetaByTS(ctx, func(_ string) bool { return true })
240240
}
241241

242-
func (lm *LogFileManager) streamingMetaByTS(ctx context.Context) (MetaNameIter, error) {
243-
it, err := lm.createMetaIterOver(ctx, lm.storage)
242+
func (lm *LogFileManager) streamingMetaByTS(ctx context.Context, cond func(path string) bool) (MetaNameIter, error) {
243+
it, err := lm.createMetaIterOver(ctx, lm.storage, cond)
244244
if err != nil {
245245
return nil, err
246246
}
@@ -250,15 +250,15 @@ func (lm *LogFileManager) streamingMetaByTS(ctx context.Context) (MetaNameIter,
250250
return filtered, nil
251251
}
252252

253-
func (lm *LogFileManager) createMetaIterOver(ctx context.Context, s storeapi.Storage) (MetaNameIter, error) {
253+
func (lm *LogFileManager) createMetaIterOver(ctx context.Context, s storeapi.Storage, cond func(path string) bool) (MetaNameIter, error) {
254254
opt := &storeapi.WalkOption{SubDir: stream.GetStreamBackupMetaPrefix()}
255255
names := []string{}
256256
err := s.WalkDir(ctx, opt, func(path string, size int64) error {
257257
if !strings.HasSuffix(path, ".meta") {
258258
return nil
259259
}
260260
newPath := stream.FilterPathByTs(path, lm.shiftStartTS, lm.restoreTS)
261-
if len(newPath) > 0 {
261+
if len(newPath) > 0 && cond(newPath) {
262262
names = append(names, newPath)
263263
}
264264
return nil
@@ -347,7 +347,13 @@ func (lm *LogFileManager) collectDDLFilesAndPrepareCache(
347347
// LoadDDLFiles loads all DDL files needs to be restored in the restoration.
348348
// This function returns all DDL files needing directly because we need sort all of them.
349349
func (lm *LogFileManager) LoadDDLFiles(ctx context.Context) ([]Log, error) {
350-
m, err := lm.streamingMeta(ctx)
350+
m, err := lm.streamingMetaByTS(ctx, func(path string) bool {
351+
parsedName, err := stream.TryParseTaggedBackupMetaFileNameWrapper(path)
352+
if err != nil {
353+
return true
354+
}
355+
return parsedName.HasDDLFiles()
356+
})
351357
if err != nil {
352358
return nil, err
353359
}

br/pkg/stream/backupmetas/parser.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ const (
3232
NameMinBeginTsInDefaultCfTag byte = 'd'
3333
NameMinTSTag byte = 'l'
3434
NameMaxTSTag byte = 'u'
35+
NameFlagsTag byte = 'p'
36+
)
37+
38+
const (
39+
flagHasDDLFiles = 1 << iota
3540
)
3641

3742
var (
@@ -46,6 +51,9 @@ type ParsedName struct {
4651
MinBeginTsInDefaultCf uint64
4752
MinTS uint64
4853
MaxTS uint64
54+
Flags uint64
55+
56+
HasFlags bool
4957
}
5058

5159
type ShiftTSStatus uint8
@@ -77,6 +85,13 @@ func (parsedName *ParsedName) CalculateShiftTS(startTS uint64, restoreTS uint64)
7785
return parsedName.MinBeginTsInDefaultCf, ShiftTSFound
7886
}
7987

88+
func (parsedName *ParsedName) HasDDLFiles() bool {
89+
if !parsedName.HasFlags {
90+
return true
91+
}
92+
return parsedName.Flags&flagHasDDLFiles != 0
93+
}
94+
8095
// TryParseTaggedBackupMetaFileName parses the tagged backupmeta file-name format.
8196
func TryParseTaggedBackupMetaFileName(fileName string) (ParsedName, error) {
8297
if !taggedBackupMetaPattern.MatchString(fileName) {
@@ -138,6 +153,8 @@ func parseTaggedBackupMetaFileName(fileName string) (ParsedName, error) {
138153
minBeginTsInDefaultCf uint64
139154
minTs uint64
140155
maxTs uint64
156+
flags uint64
157+
hasFlags bool
141158
seenTags [256]bool
142159
)
143160
for pos := 0; pos < len(suffix); {
@@ -165,6 +182,9 @@ func parseTaggedBackupMetaFileName(fileName string) (ParsedName, error) {
165182
minTs = value
166183
case NameMaxTSTag:
167184
maxTs = value
185+
case NameFlagsTag:
186+
hasFlags = true
187+
flags = value
168188
}
169189
pos += taggedMetaTagValueLen
170190
}
@@ -182,6 +202,8 @@ func parseTaggedBackupMetaFileName(fileName string) (ParsedName, error) {
182202
MinBeginTsInDefaultCf: minBeginTsInDefaultCf,
183203
MinTS: minTs,
184204
MaxTS: maxTs,
205+
Flags: flags,
206+
HasFlags: hasFlags,
185207
}, nil
186208
}
187209

br/pkg/stream/stream_mgr_fuzz_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ func FuzzParseBackupMetaFileNameRoundTrip(f *testing.F) {
3030
if !isASCIIAlphanumeric(extraTag) ||
3131
extraTag == backupmetas.NameMinBeginTsInDefaultCfTag ||
3232
extraTag == backupmetas.NameMinTSTag ||
33-
extraTag == backupmetas.NameMaxTSTag {
33+
extraTag == backupmetas.NameMaxTSTag ||
34+
extraTag == backupmetas.NameFlagsTag {
3435
extraTag = 'x'
3536
}
3637

@@ -57,6 +58,32 @@ func FuzzParseBackupMetaFileNameRoundTrip(f *testing.F) {
5758
MinTS: minTs,
5859
MaxTS: maxTs,
5960
}, taggedParsed)
61+
require.True(t, taggedParsed.HasDDLFiles())
62+
63+
taggedWithFlagsFileName := fmt.Sprintf(
64+
"%016X%016X-d%016Xu%016Xl%016Xp%016X",
65+
flushTs, storeID, minBeginTsInDefaultCf, maxTs, minTs, uint64(1),
66+
)
67+
taggedWithFlagsParsed, err := backupmetas.ParseName(taggedWithFlagsFileName)
68+
require.NoError(t, err)
69+
require.Equal(t, backupmetas.ParsedName{
70+
FlushTS: flushTs,
71+
StoreID: storeID,
72+
MinBeginTsInDefaultCf: minBeginTsInDefaultCf,
73+
MinTS: minTs,
74+
MaxTS: maxTs,
75+
Flags: 1,
76+
HasFlags: true,
77+
}, taggedWithFlagsParsed)
78+
require.True(t, taggedWithFlagsParsed.HasDDLFiles())
79+
80+
taggedWithoutDDLFileName := fmt.Sprintf(
81+
"%016X%016X-d%016Xu%016Xl%016Xp%016X",
82+
flushTs, storeID, minBeginTsInDefaultCf, maxTs, minTs, uint64(0),
83+
)
84+
taggedWithoutDDLParsed, err := backupmetas.ParseName(taggedWithoutDDLFileName)
85+
require.NoError(t, err)
86+
require.False(t, taggedWithoutDDLParsed.HasDDLFiles())
6087

6188
tagValues := map[byte]uint64{
6289
backupmetas.NameMinBeginTsInDefaultCfTag: minBeginTsInDefaultCf,

0 commit comments

Comments
 (0)