Skip to content

Commit 7193da4

Browse files
committed
add test
1 parent 3db502c commit 7193da4

File tree

5 files changed

+625
-131
lines changed

5 files changed

+625
-131
lines changed

pkg/executor/importer/cloud_sdk.go

Lines changed: 74 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package importer
1717
import (
1818
"context"
1919
"database/sql"
20+
"path/filepath"
2021
"strings"
2122
"sync"
2223

@@ -39,6 +40,9 @@ type CloudImportSDK interface {
3940
// GetTableMetaByName returns metadata for a specific table
4041
GetTableMetaByName(ctx context.Context, schema, table string) (*TableMeta, error)
4142

43+
// GetTotalSize returns the cumulative size (in bytes) of all data files under the source path
44+
GetTotalSize(ctx context.Context) (int64, error)
45+
4246
// Close releases resources used by the SDK
4347
Close() error
4448
}
@@ -89,10 +93,11 @@ func NewImportSDK(ctx context.Context, sourcePath string, db *sql.DB, options ..
8993
}
9094

9195
ldrCfg := mydump.LoaderConfig{
92-
SourceURL: sourcePath,
93-
FileRouters: cfg.fileRouteRules,
94-
// Use default rules only if no custom rules are provided
96+
SourceURL: sourcePath,
97+
Filter: cfg.filter,
98+
FileRouters: cfg.fileRouteRules,
9599
DefaultFileRules: len(cfg.fileRouteRules) == 0,
100+
CharacterSet: cfg.charset,
96101
}
97102

98103
loader, err := mydump.NewLoaderWithStore(ctx, ldrCfg, store)
@@ -115,18 +120,22 @@ type SDKOption func(*sdkConfig)
115120

116121
type sdkConfig struct {
117122
// Loader options
118-
concurrency int
119-
sqlMode mysql.SQLMode
120-
fileRouteRules []*config.FileRouteRule
123+
concurrency int
124+
sqlMode mysql.SQLMode
125+
fileRouteRules []*config.FileRouteRule
126+
filter []string
127+
charset string
121128

122129
// General options
123130
logger log.Logger
124131
}
125132

126133
func defaultSDKConfig() *sdkConfig {
127134
return &sdkConfig{
128-
concurrency: 4,
129-
logger: log.L(),
135+
concurrency: 4,
136+
filter: config.GetDefaultFilter(),
137+
logger: log.L(),
138+
charset: "auto",
130139
}
131140
}
132141

@@ -153,18 +162,34 @@ func WithSQLMode(mode mysql.SQLMode) SDKOption {
153162
}
154163
}
155164

165+
// WithFilter specifies a filter for the loader
166+
func WithFilter(filter []string) SDKOption {
167+
return func(cfg *sdkConfig) {
168+
cfg.filter = filter
169+
}
170+
}
171+
156172
// WithFileRouters specifies custom file routing rules
157173
func WithFileRouters(routers []*config.FileRouteRule) SDKOption {
158174
return func(cfg *sdkConfig) {
159175
cfg.fileRouteRules = routers
160176
}
161177
}
162178

179+
// WithCharset specifies the character set for import (default "auto").
180+
func WithCharset(cs string) SDKOption {
181+
return func(cfg *sdkConfig) {
182+
if cs != "" {
183+
cfg.charset = cs
184+
}
185+
}
186+
}
187+
163188
// CreateSchemasAndTables implements the CloudImportSDK interface
164189
func (sdk *ImportSDK) CreateSchemasAndTables(ctx context.Context) error {
165190
dbMetas := sdk.loader.GetDatabases()
166191
if len(dbMetas) == 0 {
167-
return errors.New("no database schemas found in source path")
192+
return errors.New("no databases found in the source path")
168193
}
169194

170195
// Create all schemas and tables
@@ -196,12 +221,15 @@ func (sdk *ImportSDK) GetTablesMeta(ctx context.Context) ([]*TableMeta, error) {
196221
for _, file := range tblMeta.DataFiles {
197222
allDataFiles[file.FileMeta.Path] = file
198223
}
224+
if tblMeta.SchemaFile.FileMeta.Path != "" {
225+
allDataFiles[tblMeta.SchemaFile.FileMeta.Path] = tblMeta.SchemaFile
226+
}
199227
}
200228
}
201229

202230
for _, dbMeta := range dbMetas {
203231
for _, tblMeta := range dbMeta.Tables {
204-
tableMeta, err := sdk.buildTableMeta(ctx, dbMeta, tblMeta, allDataFiles)
232+
tableMeta, err := sdk.buildTableMeta(dbMeta, tblMeta, allDataFiles)
205233
if err != nil {
206234
return nil, errors.Wrapf(err, "failed to build metadata for table %s.%s",
207235
dbMeta.Name, tblMeta.Name)
@@ -217,13 +245,16 @@ func (sdk *ImportSDK) GetTablesMeta(ctx context.Context) ([]*TableMeta, error) {
217245
func (sdk *ImportSDK) GetTableMetaByName(ctx context.Context, schema, table string) (*TableMeta, error) {
218246
dbMetas := sdk.loader.GetDatabases()
219247

220-
// Collect all data files for pattern matching
248+
// Collect all data files (and schema files) for pattern matching
221249
allDataFiles := make(map[string]mydump.FileInfo)
222250
for _, dbMeta := range dbMetas {
223251
for _, tblMeta := range dbMeta.Tables {
224252
for _, file := range tblMeta.DataFiles {
225253
allDataFiles[file.FileMeta.Path] = file
226254
}
255+
if tblMeta.SchemaFile.FileMeta.Path != "" {
256+
allDataFiles[tblMeta.SchemaFile.FileMeta.Path] = tblMeta.SchemaFile
257+
}
227258
}
228259
}
229260

@@ -238,7 +269,7 @@ func (sdk *ImportSDK) GetTableMetaByName(ctx context.Context, schema, table stri
238269
continue
239270
}
240271

241-
return sdk.buildTableMeta(ctx, dbMeta, tblMeta, allDataFiles)
272+
return sdk.buildTableMeta(dbMeta, tblMeta, allDataFiles)
242273
}
243274

244275
return nil, errors.Errorf("table '%s' not found in schema '%s'", table, schema)
@@ -247,9 +278,21 @@ func (sdk *ImportSDK) GetTableMetaByName(ctx context.Context, schema, table stri
247278
return nil, errors.Errorf("schema '%s' not found", schema)
248279
}
249280

281+
// GetTotalSize implements CloudImportSDK interface
282+
func (sdk *ImportSDK) GetTotalSize(ctx context.Context) (int64, error) {
283+
tables, err := sdk.GetTablesMeta(ctx)
284+
if err != nil {
285+
return 0, err
286+
}
287+
var total int64
288+
for _, tbl := range tables {
289+
total += tbl.TotalSize
290+
}
291+
return total, nil
292+
}
293+
250294
// buildTableMeta creates a TableMeta from database and table metadata
251295
func (sdk *ImportSDK) buildTableMeta(
252-
ctx context.Context,
253296
dbMeta *mydump.MDDatabaseMeta,
254297
tblMeta *mydump.MDTableMeta,
255298
allDataFiles map[string]mydump.FileInfo,
@@ -279,7 +322,7 @@ func (sdk *ImportSDK) buildTableMeta(
279322
if err != nil {
280323
return nil, errors.Trace(err)
281324
}
282-
tableMeta.WildcardPath = wildcard
325+
tableMeta.WildcardPath = strings.TrimSuffix(sdk.store.URI(), "/") + "/" + wildcard
283326

284327
return tableMeta, nil
285328
}
@@ -289,8 +332,10 @@ func (sdk *ImportSDK) Close() error {
289332
sdk.mu.Lock()
290333
defer sdk.mu.Unlock()
291334

292-
// Nothing to close at the moment, but this could be used
293-
// to clean up resources in the future
335+
// close external storage
336+
if sdk.store != nil {
337+
sdk.store.Close()
338+
}
294339
return nil
295340
}
296341

@@ -342,7 +387,6 @@ func (sdk *ImportSDK) generateWildcard(
342387
// Try different pattern generation strategies in order of specificity
343388
patterns := []string{
344389
generateMydumperPattern(paths), // Specific to Mydumper format
345-
generateDirectoryPattern(paths), // Try directory-based pattern
346390
generatePrefixSuffixPattern(paths), // Generic prefix/suffix pattern
347391
}
348392

@@ -356,51 +400,17 @@ func (sdk *ImportSDK) generateWildcard(
356400
return "", errors.New("unable to generate a specific wildcard pattern for this table's data files")
357401
}
358402

359-
// generateDirectoryPattern attempts to create a pattern based on directory structure
360-
func generateDirectoryPattern(paths []string) string {
361-
// Get common directory prefix
362-
dirPrefix := extractCommonDirectory(paths)
363-
if dirPrefix == "" {
364-
return ""
365-
}
366-
367-
// See if all files are in the same directory
368-
allSameDir := true
369-
for _, path := range paths {
370-
lastSlash := strings.LastIndex(path, "/")
371-
if lastSlash < 0 || path[:lastSlash+1] != dirPrefix {
372-
allSameDir = false
373-
break
374-
}
375-
}
376-
377-
if allSameDir {
378-
// Try to find common filename patterns within the directory
379-
fileNames := make([]string, len(paths))
380-
for i, path := range paths {
381-
fileNames[i] = path[len(dirPrefix):]
382-
}
383-
384-
filePrefix := longestCommonPrefix(fileNames)
385-
if filePrefix != "" {
386-
return dirPrefix + filePrefix + "*"
387-
}
388-
389-
// If no common filename prefix, just use the directory
390-
return dirPrefix + "*"
391-
}
392-
393-
return ""
394-
}
395-
396403
// validatePattern checks if a wildcard pattern matches only the table's files
397404
func validatePattern(pattern string, tableFiles map[string]struct{}, allFiles map[string]mydump.FileInfo) bool {
398405
if pattern == "" {
399406
return false
400407
}
401408

402409
for path := range allFiles {
403-
isMatch := wildcardMatches(pattern, path)
410+
isMatch, err := filepath.Match(pattern, path)
411+
if err != nil {
412+
return false // Invalid pattern
413+
}
404414
_, isTableFile := tableFiles[path]
405415

406416
// If pattern matches a file that's not from our table, it's invalid
@@ -417,23 +427,6 @@ func validatePattern(pattern string, tableFiles map[string]struct{}, allFiles ma
417427
return true
418428
}
419429

420-
// wildcardMatches checks if a path matches a wildcard pattern
421-
// This implementation handles patterns with a single * wildcard
422-
func wildcardMatches(pattern, path string) bool {
423-
if !strings.Contains(pattern, "*") {
424-
return pattern == path
425-
}
426-
427-
parts := strings.Split(pattern, "*")
428-
if len(parts) != 2 {
429-
// This implementation only handles a single wildcard
430-
return false
431-
}
432-
433-
prefix, suffix := parts[0], parts[1]
434-
return strings.HasPrefix(path, prefix) && strings.HasSuffix(path, suffix) && len(path) >= len(prefix)+len(suffix)
435-
}
436-
437430
// generateMydumperPattern creates a pattern optimized for Mydumper naming conventions
438431
func generateMydumperPattern(paths []string) string {
439432
// Check if paths appear to follow Mydumper naming convention
@@ -580,20 +573,19 @@ func generatePrefixSuffixPattern(paths []string) string {
580573
return paths[0]
581574
}
582575

583-
// Find common prefix and suffix
584576
prefix := longestCommonPrefix(paths)
585577
suffix := longestCommonSuffix(paths)
586578

587-
// If prefix and suffix would overlap, adjust them
588-
if len(prefix)+len(suffix) > len(paths[0]) {
589-
overlap := len(prefix) + len(suffix) - len(paths[0])
590-
suffix = suffix[overlap:]
579+
minLen := len(paths[0])
580+
for _, p := range paths[1:] {
581+
if len(p) < minLen {
582+
minLen = len(p)
583+
}
591584
}
592-
593-
// Construct pattern with appropriate wildcards
594-
if prefix != "" || suffix != "" {
595-
return prefix + "*" + suffix
585+
maxSuffixLen := minLen - len(prefix)
586+
if len(suffix) > maxSuffixLen {
587+
suffix = suffix[len(suffix)-maxSuffixLen:]
596588
}
597589

598-
return ""
590+
return prefix + "*" + suffix
599591
}

pkg/executor/importer/cloud_sdk_example.go

Whitespace-only changes.

0 commit comments

Comments
 (0)