diff --git a/dumpling/export/BUILD.bazel b/dumpling/export/BUILD.bazel index df738585def55..de486433e52fb 100644 --- a/dumpling/export/BUILD.bazel +++ b/dumpling/export/BUILD.bazel @@ -17,7 +17,9 @@ go_library( "retry.go", "sql.go", "sql_type.go", + "sql_util.go", "status.go", + "string_chunking.go", "task.go", "util.go", "writer.go", @@ -89,7 +91,9 @@ go_test( "prepare_test.go", "sql_test.go", "sql_type_test.go", + "sql_util_test.go", "status_test.go", + "string_chunking_test.go", "util_for_test.go", "util_test.go", "writer_serial_test.go", diff --git a/dumpling/export/dump.go b/dumpling/export/dump.go index 2fb57b4eaf00c..84d152dc65234 100644 --- a/dumpling/export/dump.go +++ b/dumpling/export/dump.go @@ -14,6 +14,7 @@ import ( "slices" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -54,6 +55,9 @@ var errEmptyHandleVals = errors.New("empty handleVals for TiDB table") // see https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_enable_paging-%E4%BB%8E-v540-%E7%89%88%E6%9C%AC%E5%BC%80%E5%A7%8B%E5%BC%95%E5%85%A5 var enablePagingVersion = semver.New("6.2.0") +// Maximum number of chunks to prevent infinite loops during boundary sampling +const maxChunkLimit = 1000000 + // Dumper is the dump progress structure type Dumper struct { tctx *tcontext.Context @@ -73,6 +77,8 @@ type Dumper struct { charsetAndDefaultCollationMap map[string]string speedRecorder *SpeedRecorder + + chunkedTables sync.Map } // NewDumper returns a new Dumper @@ -355,14 +361,8 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh writer := NewWriter(tctx, int64(i), conf, conn, d.extStore, d.metrics) writer.rebuildConnFn = rebuildConnFn writer.setFinishTableCallBack(func(task Task) { + // this is called when a file is finished. if _, ok := task.(*TaskTableData); ok { - IncCounter(d.metrics.finishedTablesCounter) - // FIXME: actually finishing the last chunk doesn't means this table is 'finished'. - // We can call this table is 'finished' if all its chunks are finished. - // Comment this log now to avoid ambiguity. - // tctx.L().Debug("finished dumping table data", - // zap.String("database", td.Meta.DatabaseName()), - // zap.String("table", td.Meta.TableName())) failpoint.Inject("EnableLogProgress", func() { time.Sleep(1 * time.Second) tctx.L().Debug("EnableLogProgress, sleep 1s") @@ -373,6 +373,22 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh IncGauge(d.metrics.taskChannelCapacity) if td, ok := task.(*TaskTableData); ok { d.metrics.completedChunks.Add(1) + if val, ok := d.chunkedTables.Load(td.Meta.ChunkKey()); ok { + chunkStats := val.(*tableChunkStat) + finishedChunks := chunkStats.finished.Add(1) + if chunkStats.finalized.Load() && finishedChunks == chunkStats.sent.Load() { + // LoadAndDelete is the atomic claim: only the winner + // sees `loaded==true`. The producer defer uses the + // same pattern, so the counter increments exactly + // once regardless of which side reaches the + // termination condition first. + if _, loaded := d.chunkedTables.LoadAndDelete(td.Meta.ChunkKey()); loaded { + IncCounter(d.metrics.finishedTablesCounter) + } + } + } else { + IncCounter(d.metrics.finishedTablesCounter) + } tctx.L().Debug("finish dumping table data task", zap.String("database", td.Meta.DatabaseName()), zap.String("table", td.Meta.TableName()), @@ -661,9 +677,25 @@ func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *BaseConn, meta Tabl return nil } - // Update total rows - fieldName, _ := pickupPossibleField(tctx, meta, conn) - c := estimateCount(tctx, meta.DatabaseName(), meta.TableName(), conn, fieldName, conf) + // Update total rows. Use "*" as the estimation column for string-leading + // composite keys; EXPLAIN on a single varchar column under-estimates and + // would otherwise make estimateTotalRowsCounter lag behind the chunked + // path's estimate (see concurrentDumpTable). + fields, isStringField, err := pickupPossibleField(tctx, meta, conn) + if err != nil { + tctx.L().Debug("pickupPossibleField failed for row estimate", + zap.String("database", meta.DatabaseName()), + zap.String("table", meta.TableName()), + log.ShortError(err)) + } + estimateField := "" + if len(fields) > 0 { + estimateField = fields[0] + if isStringField { + estimateField = "*" + } + } + c := estimateCount(tctx, meta.DatabaseName(), meta.TableName(), conn, estimateField, conf) AddCounter(d.metrics.estimateTotalRowsCounter, float64(c)) if conf.Rows == UnspecifiedSize { @@ -710,6 +742,13 @@ func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *BaseConn, meta Ta task := <-tableChan handleSubTask(task) } + // concurrentDumpTable may have registered a chunkedTables entry + // for this meta, but handleSubTask intercepts every sub-task + // without bumping `finished`, so the inner defer leaves the + // entry behind with finished < sent. Drop it now so the concat + // (or fallback) task is counted exactly once via the + // no-tracking branch in startWriters. + d.chunkedTables.Delete(meta.ChunkKey()) if len(tableDataArr) <= 1 { return nil, nil } @@ -781,9 +820,10 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *BaseConn, met } // concurrentDumpTable tries to split table into several chunks to dump -func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) error { +func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) (err error) { conf := d.conf db, tbl := meta.DatabaseName(), meta.TableName() + if conf.ServerInfo.ServerType == version.ServerTypeTiDB && conf.ServerInfo.ServerVersion != nil && (conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 || @@ -811,19 +851,50 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met return err } - field, err := pickupPossibleField(tctx, meta, conn) - if err != nil || field == "" { - // skip split chunk logic if not found proper field + fields, isStringField, err := pickupPossibleField(tctx, meta, conn) + if err != nil || len(fields) == 0 { tctx.L().Info("fallback to sequential dump due to no proper field. This won't influence the whole dump process", zap.String("database", db), zap.String("table", tbl), log.ShortError(err)) return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1) } + field := fields[0] + + // For composite string keys EXPLAIN on a single column under-estimates; + // use * so the row estimation covers the whole row. + estimateField := field + if isStringField { + estimateField = "*" + } - count := estimateCount(d.tctx, db, tbl, conn, field, conf) - tctx.L().Info("get estimated rows count", + count := estimateCount(d.tctx, db, tbl, conn, estimateField, conf) + tctx.L().Debug("get estimated rows count", zap.String("database", db), zap.String("table", tbl), - zap.Uint64("estimateCount", count)) + zap.Uint64("estimateCount", count), + zap.Strings("fields", fields), + zap.Bool("isStringField", isStringField)) + + // EXPLAIN can under-estimate (0, or a small value like 1) on freshly + // populated tables whose InnoDB rows statistic hasn't refreshed yet. + // For string chunking a pessimistic estimate silently drops us into + // the sequential path, so when the EXPLAIN result is below the chunk + // threshold we verify with a direct COUNT(*) before giving up on + // parallelism. COUNT is authoritative so it's safe to replace count. + if isStringField && count < conf.Rows { + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s` %s", + escapeString(db), escapeString(tbl), buildWhereCondition(conf, "")) + var directCount sql.NullInt64 + // simpleQueryWithArgs already iterates rows.Next(); the handler + // must not call Next() itself or it will skip past the (single) + // COUNT row. + err := conn.QuerySQL(tctx, func(rows *sql.Rows) error { + return rows.Scan(&directCount) + }, func() { directCount = sql.NullInt64{} }, countQuery) + if err == nil && directCount.Valid && directCount.Int64 > 0 { + count = uint64(directCount.Int64) + } + } + if count < conf.Rows { // skip chunk logic if estimates are low tctx.L().Info("fallback to sequential dump due to estimate count < rows. This won't influence the whole dump process", @@ -834,6 +905,10 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1) } + if isStringField { + return d.concurrentDumpStringFields(tctx, conn, meta, taskChan, fields, orderByClause, count) + } + minv, maxv, err := d.selectMinAndMaxIntValue(tctx, conn, db, tbl, field) if err != nil { tctx.L().Info("fallback to sequential dump due to cannot get bounding values. This won't influence the whole dump process", @@ -858,6 +933,16 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met chunkIndex := 0 nullValueCondition := "" + chunkStats := newTableChunkStat() + d.chunkedTables.Store(meta.ChunkKey(), chunkStats) + defer func() { + chunkStats.finalized.Store(true) + if chunkStats.finished.Load() == chunkStats.sent.Load() { + if _, loaded := d.chunkedTables.LoadAndDelete(meta.ChunkKey()); loaded { + IncCounter(d.metrics.finishedTablesCounter) + } + } + }() if conf.Where == "" { nullValueCondition = fmt.Sprintf("`%s` IS NULL OR ", escapeString(field)) } @@ -880,6 +965,11 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met } func (d *Dumper) sendTaskToChan(tctx *tcontext.Context, task Task, taskChan chan<- Task) (ctxDone bool) { + if td, ok := task.(*TaskTableData); ok { + if val, ok := d.chunkedTables.Load(td.Meta.ChunkKey()); ok { + val.(*tableChunkStat).sent.Add(1) + } + } select { case <-tctx.Done(): return true @@ -891,6 +981,54 @@ func (d *Dumper) sendTaskToChan(tctx *tcontext.Context, task Task, taskChan chan } } +// tableChunkStat tracks per-table chunk progress so finishedTablesCounter +// is incremented exactly once, when the last chunk of a table is written +// (rather than when the first chunk finishes, which was the old behavior +// noted by the FIXME in startWriters). +// +// Termination handshake: the producer defer (after it has finished every +// sendTaskToChan call) sets finalized=true and then checks +// `finished == sent`. The consumer callback (startWriters) increments +// finished and then checks `finalized && finished == sent`. Both sides +// can observe the same termination condition concurrently, so the +// IncCounter + Delete pair must be atomic w.r.t. the other side. +// Both sites call chunkedTables.LoadAndDelete and only the side that +// gets `loaded==true` performs the increment. sent.Add happens +// synchronously inside sendTaskToChan before the blocking send, so +// once finalized flips true no new sent.Add can race in. +type tableChunkStat struct { + sent *gatomic.Int32 + finished *gatomic.Int32 + finalized *gatomic.Bool +} + +func newTableChunkStat() *tableChunkStat { + return &tableChunkStat{ + sent: gatomic.NewInt32(0), + finished: gatomic.NewInt32(0), + finalized: gatomic.NewBool(false), + } +} + +// beginChunkTracking registers a chunk-stat entry for meta and returns a +// finalizer that closes the producer side of the termination handshake. Use +// this in producers that emit multiple TaskTableData tasks per table; without +// it, every task hits the no-tracking branch in startWriters and +// finishedTablesCounter is incremented once per chunk instead of once per +// table. +func (d *Dumper) beginChunkTracking(meta TableMeta) func() { + chunkStats := newTableChunkStat() + d.chunkedTables.Store(meta.ChunkKey(), chunkStats) + return func() { + chunkStats.finalized.Store(true) + if chunkStats.finished.Load() == chunkStats.sent.Load() { + if _, loaded := d.chunkedTables.LoadAndDelete(meta.ChunkKey()); loaded { + IncCounter(d.metrics.finishedTablesCounter) + } + } + } +} + func (d *Dumper) selectMinAndMaxIntValue(tctx *tcontext.Context, conn *BaseConn, db, tbl, field string) (minv, maxv *big.Int, err error) { conf, zero := d.conf, &big.Int{} query := fmt.Sprintf("SELECT MIN(`%s`),MAX(`%s`) FROM `%s`.`%s`", @@ -958,6 +1096,7 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *BaseConn if err != nil { return err } + defer d.beginChunkTracking(meta)() return d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1) } @@ -982,6 +1121,7 @@ func (d *Dumper) concurrentDumpTiDBPartitionTablesWithTableSample(tctx *tcontext totalChunk += len(handleVals) + 1 } startChunk := 0 + defer d.beginChunkTracking(meta)() for i, partition := range d.conf.Partitions { err = d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, pkFields, cachedHandleVals[i], partition, startChunk, totalChunk) if err != nil { @@ -1014,6 +1154,8 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn totalChunk += len(handleVals) + 1 cachedHandleVals[i] = handleVals } + + defer d.beginChunkTracking(meta)() for i, partition := range partitions { err := d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk) if err != nil { @@ -2037,5 +2179,53 @@ func (d *Dumper) renewSelectTableRegionFuncForLowerTiDB(tctx *tcontext.Context) func (d *Dumper) newTaskTableData(meta TableMeta, data TableDataIR, currentChunk, totalChunks int) *TaskTableData { d.metrics.totalChunks.Add(1) - return NewTaskTableData(meta, data, currentChunk, totalChunks) + // Chunking mode is already set at table level in concurrentDumpTable + task := NewTaskTableData(meta, data, currentChunk, totalChunks) + return task +} + +// extractOrderByColumns extracts column names from ORDER BY clause +// Input: "ORDER BY `item_id`,`photo_index`" +// Output: ["`item_id`", "`photo_index`"] +// Handles column names that contain commas by respecting backtick quoting +func extractOrderByColumns(orderByClause string) []string { + // Remove "ORDER BY " prefix + columnsStr := strings.TrimPrefix(orderByClause, "ORDER BY ") + + // Handle empty clause: returning nil (rather than []string{""}) signals + // "no chunking columns" so callers can bail out instead of synthesizing + // SELECT lists or WHERE predicates from an empty column name. The prefix + // "ORDER BY " is matched case-sensitively with a single space; callers + // should normalize before passing in. + if columnsStr == "" { + return nil + } + + var columns []string + var currentColumn strings.Builder + inBackticks := false + + for i := range len(columnsStr) { + ch := columnsStr[i] + + if ch == '`' { + inBackticks = !inBackticks + currentColumn.WriteByte(ch) + } else if ch == ',' && !inBackticks { + // Found a column separator outside of backticks + if col := strings.TrimSpace(currentColumn.String()); col != "" { + columns = append(columns, col) + } + currentColumn.Reset() + } else { + currentColumn.WriteByte(ch) + } + } + + // Add the last column + if col := strings.TrimSpace(currentColumn.String()); col != "" { + columns = append(columns, col) + } + + return columns } diff --git a/dumpling/export/dump_test.go b/dumpling/export/dump_test.go index 479790dae4270..9025cb51218a6 100644 --- a/dumpling/export/dump_test.go +++ b/dumpling/export/dump_test.go @@ -5,6 +5,7 @@ package export import ( "context" "fmt" + "sync" "sync/atomic" "testing" "time" @@ -904,3 +905,45 @@ func TestSetSessionParams(t *testing.T) { err = setSessionParam(d) require.NoError(t, err) } + +func TestTableChunkStatTracking(t *testing.T) { + // Test table chunk statistics for streaming chunk progress tracking + + // Test initialization + stats := newTableChunkStat() + require.NotNil(t, stats, "Should create new chunk statistics") + require.Equal(t, int32(0), stats.sent.Load(), "Initial sent count should be 0") + require.Equal(t, int32(0), stats.finished.Load(), "Initial finished count should be 0") + require.False(t, stats.finalized.Load(), "Initial finalized status should be false") + + // Test concurrent updates + var wg sync.WaitGroup + concurrency := 10 + wg.Add(concurrency * 2) + + // Simulate multiple goroutines sending chunks + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + stats.sent.Add(1) + }() + } + + // Simulate multiple goroutines finishing chunks + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + stats.finished.Add(1) + }() + } + + wg.Wait() + + // Verify counts + require.Equal(t, int32(concurrency), stats.sent.Load(), "Sent count should match concurrent additions") + require.Equal(t, int32(concurrency), stats.finished.Load(), "Finished count should match concurrent additions") + + // Test finalization + stats.finalized.Store(true) + require.True(t, stats.finalized.Load(), "Finalized status should be true after setting") +} diff --git a/dumpling/export/ir.go b/dumpling/export/ir.go index 136cce41f0880..1e1780d7208b7 100644 --- a/dumpling/export/ir.go +++ b/dumpling/export/ir.go @@ -36,6 +36,7 @@ type TableMeta interface { ShowCreateView() string AvgRowLength() uint64 HasImplicitRowID() bool + ChunkKey() string ColumnInfos() []*ColumnInfo } diff --git a/dumpling/export/ir_impl_test.go b/dumpling/export/ir_impl_test.go index eefd954d46aac..44310ad29b4e1 100644 --- a/dumpling/export/ir_impl_test.go +++ b/dumpling/export/ir_impl_test.go @@ -128,3 +128,41 @@ func TestChunkRowIter(t *testing.T) { require.Error(t, sqlRowIter.Decode(res)) sqlRowIter.Next() } + +func TestRowIterWithStringKeyProgress(t *testing.T) { + // Test row iteration with progress tracking for string key chunking + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer func() { + _ = db.Close() + }() + + // Simulate a chunk from streaming string key chunking + expectedRows := mock.NewRows([]string{"id", "data"}). + AddRow("key_001", "data1"). + AddRow("key_002", "data2"). + AddRow("key_003", "data3") + + mock.ExpectQuery("SELECT id, data FROM table WHERE.*").WillReturnRows(expectedRows) + rows, err := db.Query("SELECT id, data FROM table WHERE id >= 'key_001' AND id < 'key_100'") + require.NoError(t, err) + + iter := newRowIter(rows, 2) + + // Test that iteration works with string-based chunking results + rowCount := 0 + res := newSimpleRowReceiver(2) + + for iter.HasNext() { + require.NoError(t, iter.Decode(res)) + require.True(t, strings.HasPrefix(res.data[0], "key_"), "Should have key prefix") + require.True(t, strings.HasPrefix(res.data[1], "data"), "Should have data prefix") + iter.Next() + rowCount++ + } + + require.Equal(t, 3, rowCount, "Should process all rows in the chunk") + require.False(t, iter.HasNext(), "Should reach end of iteration") + require.NoError(t, iter.Close(), "Should close iterator cleanly") +} diff --git a/dumpling/export/metadata.go b/dumpling/export/metadata.go index 8f3b692322371..3a89fa10cfc93 100644 --- a/dumpling/export/metadata.go +++ b/dumpling/export/metadata.go @@ -27,6 +27,10 @@ type globalMetadata struct { storage storeapi.Storage } +func (t *tableMeta) ChunkKey() string { + return t.DatabaseName() + "." + t.TableName() +} + const ( metadataPath = "metadata" metadataTimeLayout = time.DateTime diff --git a/dumpling/export/sql.go b/dumpling/export/sql.go index a7510d8f7478d..71aef0c013c77 100644 --- a/dumpling/export/sql.go +++ b/dumpling/export/sql.go @@ -1296,19 +1296,29 @@ func simpleQueryWithArgs(ctx context.Context, conn *sql.Conn, handleOneRow func( return errors.Annotatef(rows.Err(), "sql: %s, args: %s", query, args) } -func pickupPossibleField(tctx *tcontext.Context, meta TableMeta, db *BaseConn) (string, error) { - // try using _tidb_rowid first +// pickupPossibleField selects columns to chunk on. It prefers a numeric +// handle (_tidb_rowid or a numeric PK/UK), and falls back to the columns +// of a string-or-mixed PK/UK when no numeric handle exists. The returned +// isString is true when the first chunking column is a string type; +// string chunking callers consume the full slice, numeric callers just +// use the first element. +func pickupPossibleField(tctx *tcontext.Context, meta TableMeta, db *BaseConn) ([]string, bool, error) { if meta.HasImplicitRowID() { - return "_tidb_rowid", nil + return []string{"_tidb_rowid"}, false, nil } - // try to use pk or uk fieldName, err := getNumericIndex(tctx, db, meta) if err != nil { - return "", err + return nil, false, err } - - // if fieldName == "", there is no proper index - return fieldName, nil + if fieldName != "" { + return []string{fieldName}, false, nil + } + // No numeric handle — try string / mixed composite index. + fields, isStrings, err := getStringOrNumericIndexColumns(tctx, db, meta) + if err != nil || len(fields) == 0 { + return nil, false, err + } + return fields, isStrings[0], nil } func estimateCount(tctx *tcontext.Context, dbName, tableName string, db *BaseConn, field string, conf *Config) uint64 { diff --git a/dumpling/export/sql_test.go b/dumpling/export/sql_test.go index 67b10b11a77cd..06c30817814a7 100644 --- a/dumpling/export/sql_test.go +++ b/dumpling/export/sql_test.go @@ -1801,11 +1801,15 @@ func TestPickupPossibleField(t *testing.T) { mock.ExpectQuery(query).WillReturnRows(rows) } - field, err := pickupPossibleField(tctx, meta, baseConn) + fields, _, err := pickupPossibleField(tctx, meta, baseConn) if expectedErr != nil { require.ErrorIs(t, err, expectedErr) } else { require.NoError(t, err) + field := "" + if len(fields) > 0 { + field = fields[0] + } require.Equal(t, testCase.expectedField, field) } require.NoError(t, mock.ExpectationsWereMet()) diff --git a/dumpling/export/sql_util.go b/dumpling/export/sql_util.go new file mode 100644 index 0000000000000..9410f32c4119c --- /dev/null +++ b/dumpling/export/sql_util.go @@ -0,0 +1,277 @@ +package export + +import ( + "bytes" + "fmt" + "math" + "strconv" + "strings" + + tcontext "github.com/pingcap/tidb/dumpling/context" +) + +// getStringOrNumericIndexColumns picks up indices for chunking, including string columns +// follows same priority: primary key > unique key with smallest count > key with max cardinality +// Returns all columns of the selected index for proper composite key handling +func getStringOrNumericIndexColumns(tctx *tcontext.Context, db *BaseConn, meta TableMeta) ([]string, []bool, error) { + database, table := meta.DatabaseName(), meta.TableName() + colName2Type := string2Map(meta.ColumnNames(), meta.ColumnTypes()) + keyQuery := fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", escapeString(database), escapeString(table)) + results, err := db.QuerySQLWithColumns(tctx, []string{"NON_UNIQUE", "SEQ_IN_INDEX", "KEY_NAME", "COLUMN_NAME", "CARDINALITY"}, keyQuery) + if err != nil { + return nil, nil, err + } + + type keyInfo struct { + columns []string + isStrings []bool + count uint64 + } + var ( + uniqueKeyMap = map[string]*keyInfo{} // unique key name -> key info + bestKey *keyInfo + maxCardinality int64 = -1 + ) + + // Group columns by key name and sequence + keyColumns := make(map[string]map[uint64]string) // keyName -> seqIndex -> columnName + for _, oneRow := range results { + _, seqInIndex, keyName, colName, _ := oneRow[0], oneRow[1], oneRow[2], oneRow[3], oneRow[4] + seqInIndexInt, err := strconv.ParseUint(seqInIndex, 10, 64) + if err != nil { + continue + } + + if keyColumns[keyName] == nil { + keyColumns[keyName] = make(map[uint64]string) + } + keyColumns[keyName][seqInIndexInt] = colName + } + + // Process each key to build complete column lists + for _, oneRow := range results { + nonUnique, seqInIndex, keyName, _, cardinality := oneRow[0], oneRow[1], oneRow[2], oneRow[3], oneRow[4] + + // Only process the first column of each key to avoid duplicates + if seqInIndex != "1" { + continue + } + + // Build complete column list for this key + var columns []string + var isStrings []bool + keyColMap := keyColumns[keyName] + + // Add columns in sequence order + for seq := uint64(1); seq <= uint64(len(keyColMap)); seq++ { + if colName, exists := keyColMap[seq]; exists { + colType := colName2Type[colName] + _, isNumeric := dataTypeInt[colType] + _, isString := dataTypeString[colType] + + // Accept both numeric and string columns for chunking + if !isNumeric && !isString { + // If any column in the key is not numeric/string, skip this key + columns = nil + isStrings = nil + break + } + columns = append(columns, colName) + isStrings = append(isStrings, isString) + } + } + + if len(columns) == 0 { + continue + } + + keyInfoObj := &keyInfo{ + columns: columns, + isStrings: isStrings, + count: uint64(len(columns)), + } + + switch { + case keyName == "PRIMARY": + return columns, isStrings, nil + case nonUnique == "0": + uniqueKeyMap[keyName] = keyInfoObj + case len(uniqueKeyMap) == 0: + cardinalityInt, err := strconv.ParseInt(cardinality, 10, 64) + if err == nil && cardinalityInt > maxCardinality { + bestKey = keyInfoObj + maxCardinality = cardinalityInt + } + } + } + + if len(uniqueKeyMap) > 0 { + var minCols uint64 = math.MaxUint64 + for _, keyInfo := range uniqueKeyMap { + if keyInfo.count < minCols { + bestKey = keyInfo + minCols = keyInfo.count + } + } + } + + if bestKey != nil { + return bestKey.columns, bestKey.isStrings, nil + } + return nil, nil, nil +} + +// pickupPossibleFieldsForStringChunking returns all columns of the selected index for composite key chunking +// escapeSQLString properly escapes a string for use in internal SQL boundary condition queries. +// This function is used only for generating WHERE clauses in chunking and pagination queries, +// and does NOT affect the dumpling data output format. Data output escaping is controlled +// separately by the global --escape-backslash flag in the WriteToBuffer methods. +func escapeSQLString(s string) string { + var buf bytes.Buffer + buf.WriteByte('\'') + // Use the existing escapeSQL function with escapeBackslash=true for proper SQL escaping + escapeSQL([]byte(s), &buf, true) + buf.WriteByte('\'') + return buf.String() +} + +// Helper functions for streaming boundary generation + +// buildCursorWhereClause builds a WHERE clause for cursor-based pagination using OR conditions +// For composite keys: WHERE col1 > val1 OR (col1 = val1 AND col2 > val2) OR (col1 = val1 AND col2 = val2 AND col3 > val3) ... +func buildCursorWhereClause(columnNames []string, boundary []string) string { + if len(boundary) == 0 || len(columnNames) == 0 { + return "" + } + + quotedCols := make([]string, len(columnNames)) + escapedBoundary := make([]string, len(columnNames)) + + for i, col := range columnNames { + // Check if column is already quoted with backticks + if strings.HasPrefix(col, "`") && strings.HasSuffix(col, "`") { + quotedCols[i] = col // Already quoted, use as-is + } else { + quotedCols[i] = fmt.Sprintf("`%s`", escapeString(col)) // Quote and escape + } + } + + for i, val := range boundary { + if val != "" { + escapedBoundary[i] = escapeSQLString(val) + } else { + escapedBoundary[i] = "''" + } + } + + conditions := make([]string, 0, len(quotedCols)) + + // Generate OR conditions for cursor-based pagination + // col1 > val1 OR (col1 = val1 AND col2 > val2) OR (col1 = val1 AND col2 = val2 AND col3 > val3) ... + for i := range quotedCols { + var condition strings.Builder + + // Add equality conditions for previous columns + if i > 0 { + condition.WriteString("(") + for j := range i { + if j > 0 { + condition.WriteString(" AND ") + } + fmt.Fprintf(&condition, "%s = %s", quotedCols[j], escapedBoundary[j]) + } + condition.WriteString(" AND ") + } + + // Add greater than condition for current column + if i == len(quotedCols)-1 { + fmt.Fprintf(&condition, "%s >= %s", quotedCols[i], escapedBoundary[i]) + } else { + fmt.Fprintf(&condition, "%s > %s", quotedCols[i], escapedBoundary[i]) + } + + if i > 0 { + condition.WriteString(")") + } + + conditions = append(conditions, condition.String()) + } + + return strings.Join(conditions, " OR ") +} + +// buildUpperBoundWhereClause builds a WHERE clause with only an upper bound using OR conditions +// For composite keys: WHERE col1 < val1 OR (col1 = val1 AND col2 < val2) OR (col1 = val1 AND col2 = val2 AND col3 < val3) ... +func buildUpperBoundWhereClause(columnNames []string, upperBoundary []string) string { + if len(upperBoundary) == 0 || len(columnNames) == 0 { + return "" + } + + quotedCols := make([]string, len(columnNames)) + escapedBoundary := make([]string, len(columnNames)) + + for i, col := range columnNames { + // Check if column is already quoted with backticks + if strings.HasPrefix(col, "`") && strings.HasSuffix(col, "`") { + quotedCols[i] = col // Already quoted, use as-is + } else { + quotedCols[i] = fmt.Sprintf("`%s`", escapeString(col)) // Quote and escape + } + } + + for i, val := range upperBoundary { + if val != "" { + escapedBoundary[i] = escapeSQLString(val) + } else { + escapedBoundary[i] = "''" + } + } + + conditions := make([]string, 0, len(quotedCols)) + + // Generate OR conditions for upper bound + // col1 < val1 OR (col1 = val1 AND col2 < val2) OR (col1 = val1 AND col2 = val2 AND col3 < val3) ... + for i := range quotedCols { + var condition strings.Builder + + // Add equality conditions for previous columns + if i > 0 { + condition.WriteString("(") + for j := range i { + if j > 0 { + condition.WriteString(" AND ") + } + fmt.Fprintf(&condition, "%s = %s", quotedCols[j], escapedBoundary[j]) + } + condition.WriteString(" AND ") + } + + // Add less than condition for current column + fmt.Fprintf(&condition, "%s < %s", quotedCols[i], escapedBoundary[i]) + + if i > 0 { + condition.WriteString(")") + } + + conditions = append(conditions, condition.String()) + } + + return strings.Join(conditions, " OR ") +} + +// buildBoundedWhereClause builds a WHERE clause with both bounds using OR conditions +// For composite keys: WHERE (lower_conditions) AND (upper_conditions) +func buildBoundedWhereClause(columnNames []string, lowerBoundary, upperBoundary []string) string { + if len(lowerBoundary) == 0 || len(upperBoundary) == 0 || len(columnNames) == 0 { + return "" + } + + lowerClause := buildCursorWhereClause(columnNames, lowerBoundary) + upperClause := buildUpperBoundWhereClause(columnNames, upperBoundary) + + if lowerClause == "" || upperClause == "" { + return "" + } + + return fmt.Sprintf("(%s) AND (%s)", lowerClause, upperClause) +} diff --git a/dumpling/export/sql_util_test.go b/dumpling/export/sql_util_test.go new file mode 100644 index 0000000000000..c914b83b850b0 --- /dev/null +++ b/dumpling/export/sql_util_test.go @@ -0,0 +1,94 @@ +package export + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEscapeSQLStringAdvanced(t *testing.T) { + // Test advanced SQL string escaping scenarios + + testCases := []struct { + name string + input string + expected string + }{ + { + name: "simple string", + input: "hello", + expected: "'hello'", + }, + { + name: "string with single quote", + input: "don't", + expected: "'don\\'t'", + }, + { + name: "string with multiple quotes", + input: "it's a 'test'", + expected: "'it\\'s a \\'test\\''", + }, + { + name: "string with double quotes", + input: `say "hello"`, + expected: `'say \"hello\"'`, + }, + { + name: "string with backslash", + input: `path\to\file`, + expected: `'path\\to\\file'`, + }, + { + name: "empty string", + input: "", + expected: "''", + }, + { + name: "string with special characters", + input: "value_x123*(", + expected: "'value_x123*('", + }, + { + name: "problematic boundary case", + input: "value_y456)'", + expected: `'value_y456)\''`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := escapeSQLString(tc.input) + require.Equal(t, tc.expected, result, "Failed for input: %s", tc.input) + + // Verify the result is properly quoted + require.True(t, len(result) >= 2, "Result should be at least 2 characters (quotes)") + require.Equal(t, "'", string(result[0]), "Should start with single quote") + require.Equal(t, "'", string(result[len(result)-1]), "Should end with single quote") + }) + } +} + +func TestColumnQuotingInWhereClauses(t *testing.T) { + // Test proper column name quoting in WHERE clause generation + + // Test with already quoted columns + columnNames := []string{"`user_id`", "`created_at`"} + boundary := []string{"user123", "2023-01-01"} + + result := buildCursorWhereClause(columnNames, boundary) + expected := "`user_id` > 'user123' OR (`user_id` = 'user123' AND `created_at` >= '2023-01-01')" + require.Equal(t, expected, result, "Should handle pre-quoted column names correctly") + + // Test with unquoted columns + columnNames = []string{"user_id", "created_at"} + result = buildCursorWhereClause(columnNames, boundary) + expected = "`user_id` > 'user123' OR (`user_id` = 'user123' AND `created_at` >= '2023-01-01')" + require.Equal(t, expected, result, "Should quote unquoted column names") + + // Test mixed quoting + columnNames = []string{"`user_id`", "created_at"} + result = buildCursorWhereClause(columnNames, boundary) + expected = "`user_id` > 'user123' OR (`user_id` = 'user123' AND `created_at` >= '2023-01-01')" + require.Equal(t, expected, result, "Should handle mixed quoting correctly") +} diff --git a/dumpling/export/status.go b/dumpling/export/status.go index 0a861f4c40677..e571e570555fc 100644 --- a/dumpling/export/status.go +++ b/dumpling/export/status.go @@ -83,6 +83,22 @@ func (d *Dumper) GetStatus() *DumpStatus { } else { ret.Progress = fmt.Sprintf("%5.2f %%", progress*100) } + } else { + // Streaming tables set totalChunks incrementally and never flip + // progressReady, so surface best-effort progress from the + // in-flight counters. totalChunks is loaded before + // completedChunks, so concurrent task completions can briefly + // push the ratio above 1; clamp for consistency with the + // progressReady branch above. + totalChunks := d.metrics.totalChunks.Load() + completedChunks := d.metrics.completedChunks.Load() + if totalChunks > 0 { + progress := float64(completedChunks) / float64(totalChunks) + if progress > 1 { + progress = 1 + } + ret.Progress = fmt.Sprintf("%5.2f %% (streaming)", progress*100) + } } return ret } diff --git a/dumpling/export/string_chunking.go b/dumpling/export/string_chunking.go new file mode 100644 index 0000000000000..52a9538950b01 --- /dev/null +++ b/dumpling/export/string_chunking.go @@ -0,0 +1,316 @@ +// Copyright 2025 PingCAP, Inc. Licensed under Apache-2.0. + +package export + +import ( + "database/sql" + "fmt" + "strconv" + "strings" + + tcontext "github.com/pingcap/tidb/dumpling/context" + "go.uber.org/zap" +) + +// concurrentDumpStringFields handles composite-key chunking where the +// leading index column is a string. The caller (concurrentDumpTable) has +// already decided the table is large enough to chunk, so no small-table +// guard is needed here. +func (d *Dumper) concurrentDumpStringFields(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task, fields []string, orderByClause string, estimatedCount uint64) error { + chunkSize := int64(d.conf.Rows) + numChunks := (int64(estimatedCount) + chunkSize - 1) / chunkSize + selectField, selectLen := meta.SelectedField(), meta.SelectedLen() + + tctx.L().Info("starting streaming string-based chunking", + zap.String("database", meta.DatabaseName()), + zap.String("table", meta.TableName()), + zap.Strings("fields", fields), + zap.Int64("estimatedChunks", numChunks), + zap.Int64("chunkSize", chunkSize)) + + return d.streamStringChunks(tctx, conn, meta, taskChan, fields, orderByClause, chunkSize, selectField, selectLen) +} + +// streamStringChunks generates boundaries incrementally and sends tasks with buffering to handle last chunk detection. +// The estimated chunk count is only used in the caller's startup log; the loop itself streams until the cursor returns no more rows. +func (d *Dumper) streamStringChunks(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task, fields []string, orderByClause string, chunkSize int64, selectField string, selectLen int) error { + conf := d.conf + db, tbl := meta.DatabaseName(), meta.TableName() + + // For boundary sampling, we need to select all columns used in ORDER BY + // Extract column names from ORDER BY clause and select them for boundary sampling + // This ensures we get complete composite key values for proper WHERE clause generation + + // Parse ORDER BY clause to extract all column names + // orderByClause format: "ORDER BY `item_id`,`photo_index`" + orderByColumns := extractOrderByColumns(orderByClause) + + // Build SELECT columns for boundary sampling (all ORDER BY columns) + selectCols := strings.Join(orderByColumns, ", ") + + tctx.L().Debug("boundary sampling setup", + zap.String("database", db), + zap.String("table", tbl), + zap.Strings("chunkingFields", fields), + zap.Strings("orderByColumns", orderByColumns), + zap.String("dataOrderBy", orderByClause)) + + // Initialize chunk tracking + chunkStats := newTableChunkStat() + d.chunkedTables.Store(meta.ChunkKey(), chunkStats) + + // Buffering approach: Buffer one chunk to determine if it's the last one + type bufferedChunk struct { + task *TaskTableData + chunkIndex int + } + var buffer *bufferedChunk + + // Helper function to send buffered chunk + sendBufferedChunk := func(isLast bool) error { + if buffer != nil { + // Update the task with proper chunk info + if isLast { + buffer.task.TotalChunks = buffer.chunkIndex + 1 // Now we know the total + } + + ctxDone := d.sendTaskToChan(tctx, buffer.task, taskChan) + if ctxDone { + return tctx.Err() + } + buffer = nil + } + return nil + } + + // True streaming approach: Buffer chunks to handle last chunk detection + var totalChunks int64 + + defer func() { + // Send any remaining buffered chunk as the last chunk + if buffer != nil { + _ = sendBufferedChunk(true) // Mark as last chunk + } + + chunkStats.finalized.Store(true) + + tctx.L().Debug("streaming chunking complete", + zap.String("database", db), + zap.String("table", tbl), + zap.Int64("totalChunks", totalChunks)) + + if chunkStats.finished.Load() == chunkStats.sent.Load() { + // Atomic claim via LoadAndDelete: if the last writer callback + // already handled termination, Delete returns loaded==false + // and we skip the increment to avoid double-counting the + // finishedTablesCounter. + if _, loaded := d.chunkedTables.LoadAndDelete(meta.ChunkKey()); loaded { + IncCounter(d.metrics.finishedTablesCounter) + } + } + }() + + var previousBoundary []string + // Continue boundary sampling until end of data (ignore numChunks estimate for streaming) + for i := int64(1); ; i++ { + // Check if we've hit the safety limit + if i >= maxChunkLimit { + tctx.L().Warn("hit max chunk limit during boundary sampling", + zap.String("database", db), + zap.String("table", tbl), + zap.Int64("chunkIndex", i), + zap.Int64("maxChunkLimit", maxChunkLimit)) + break + } + + // Sample boundary for chunk i + var sampleQuery string + + // Use cursor-based boundary sampling for optimal performance + if len(previousBoundary) == 0 { + // First boundary: OFFSET is acceptable for the first boundary + offset := chunkSize + sampleQuery = fmt.Sprintf( + "SELECT %s FROM `%s`.`%s` %s LIMIT 1 OFFSET %d", + selectCols, + escapeString(db), + escapeString(tbl), + orderByClause, + offset) + } else { + // Subsequent boundaries: use cursor-based pagination for performance + // Skip chunkSize rows from previous boundary, then take the first row + whereClause := buildCursorWhereClause(orderByColumns, previousBoundary) + fullWhere := buildWhereCondition(conf, whereClause) + sampleQuery = fmt.Sprintf( + "SELECT %s FROM `%s`.`%s` %s %s LIMIT 1 OFFSET %d", + selectCols, + escapeString(db), + escapeString(tbl), + fullWhere, + orderByClause, + chunkSize) // Skip chunkSize more rows from cursor position + } + + tctx.L().Debug("sampling boundary", + zap.String("query", sampleQuery), + zap.Int64("chunkIndex", i), + zap.Bool("usingCursor", len(previousBoundary) > 0)) + + // Execute boundary sampling query + var currentBoundary []string + err := conn.QuerySQL(tctx, func(rows *sql.Rows) error { + // We're selecting all ORDER BY columns, not just chunking fields + values := make([]any, len(orderByColumns)) + scanArgs := make([]any, len(orderByColumns)) + for j := range values { + scanArgs[j] = &values[j] + } + + if err := rows.Scan(scanArgs...); err != nil { + return err + } + + currentBoundary = make([]string, len(orderByColumns)) + for j, val := range values { + if val == nil { + currentBoundary[j] = "" + continue + } + // Supported driver-returned types for boundary columns. The + // MySQL driver returns []byte for string/binary/DECIMAL/DATE* + // (parseTime is not enabled, see GetDriverConfig), int64 for + // integers, and float64 for floating-point — all of which + // round-trip safely through the WHERE-clause builders. Any + // other type would silently produce a malformed predicate, so + // fail loudly instead. + switch v := val.(type) { + case string: + currentBoundary[j] = v + case []byte: + currentBoundary[j] = string(v) + case int64: + currentBoundary[j] = strconv.FormatInt(v, 10) + case int32: + currentBoundary[j] = strconv.FormatInt(int64(v), 10) + case int: + currentBoundary[j] = strconv.Itoa(v) + case float64: + currentBoundary[j] = strconv.FormatFloat(v, 'f', -1, 64) + case float32: + currentBoundary[j] = strconv.FormatFloat(float64(v), 'f', -1, 32) + default: + return fmt.Errorf("unsupported boundary column type %T for %s in table `%s`.`%s`", + v, orderByColumns[j], db, tbl) + } + } + return nil + }, func() { + // Reset between QuerySQL retries: drop any partial boundary the + // row callback may have written on the previous attempt, so the + // caller's len(currentBoundary)==0 check after the call reflects + // the *retried* attempt rather than stale state. + currentBoundary = nil + }, sampleQuery) + + if err != nil { + // Don't swallow sampling failures: with `break` the loop would fall + // through to the post-loop branches and either dump the entire table + // as one un-chunked task (when the very first sample fails) or emit + // an oversized tail chunk from the last good boundary to +∞ (when a + // later sample fails). Both silently degrade the dump. Propagate so + // the job-level retry/backoff applies and the caller sees the + // failure. + tctx.L().Warn("failed to sample boundary, aborting string-key chunking", + zap.String("database", db), + zap.String("table", tbl), + zap.Int64("chunkIndex", i), + zap.Error(err)) + return err + } + + if len(currentBoundary) == 0 { + tctx.L().Debug("boundary sampling returned no results - reached end of data", + zap.String("database", db), + zap.String("table", tbl), + zap.Int64("chunkIndex", i)) + break + } + + tctx.L().Debug("sampled boundary", + zap.Int64("boundaryIndex", i), + zap.Strings("boundary", currentBoundary)) + + // Create task for chunk using previousBoundary -> currentBoundary (with buffering) + var newTask *TaskTableData + + if len(previousBoundary) == 0 { + // First chunk: everything up to first boundary + whereClause := buildUpperBoundWhereClause(orderByColumns, currentBoundary) + fullWhere := buildWhereCondition(conf, whereClause) + query := buildSelectQuery(db, tbl, selectField, "", fullWhere, orderByClause) + newTask = d.newTaskTableData(meta, newTableData(query, selectLen, false), int(totalChunks), -1) + } else { + // Intermediate chunk: between previousBoundary and currentBoundary + whereClause := buildBoundedWhereClause(orderByColumns, previousBoundary, currentBoundary) + fullWhere := buildWhereCondition(conf, whereClause) + query := buildSelectQuery(db, tbl, selectField, "", fullWhere, orderByClause) + newTask = d.newTaskTableData(meta, newTableData(query, selectLen, false), int(totalChunks), -1) + } + + // Send previous buffered chunk (now we know it's not the last) + if err := sendBufferedChunk(false); err != nil { + return err + } + + // Buffer the new task + buffer = &bufferedChunk{ + task: newTask, + chunkIndex: int(totalChunks), + } + totalChunks++ + + previousBoundary = currentBoundary // Update for next iteration + } + + // After the loop, check if there's any remaining data to dump. + // This happens if the last boundary sampling query returned no results, + // meaning we've reached the end of the table, but there might be a partial chunk left. + if len(previousBoundary) > 0 && totalChunks > 0 { + // Send previous buffered chunk (now we know it's not the last) + if err := sendBufferedChunk(false); err != nil { + return err + } + + // Create and buffer the final chunk + whereClause := buildCursorWhereClause(orderByColumns, previousBoundary) + fullWhere := buildWhereCondition(conf, whereClause) + query := buildSelectQuery(db, tbl, selectField, "", fullWhere, orderByClause) + finalTask := d.newTaskTableData(meta, newTableData(query, selectLen, false), int(totalChunks), -1) + + buffer = &bufferedChunk{ + task: finalTask, + chunkIndex: int(totalChunks), + } + totalChunks++ + // The defer function will send this final chunk marked as last + } else if totalChunks == 0 { + // No boundaries found at all — dump the whole table as one chunk. + query := buildSelectQuery(db, tbl, selectField, "", buildWhereCondition(conf, ""), orderByClause) + task := d.newTaskTableData(meta, newTableData(query, selectLen, false), 0, 1) + + // Single chunk case - send immediately as we know it's the only one + ctxDone := d.sendTaskToChan(tctx, task, taskChan) + if ctxDone { + return tctx.Err() + } + totalChunks++ + } + + tctx.L().Info("completed streaming chunking", + zap.String("database", db), + zap.String("table", tbl), + zap.Int64("chunks", totalChunks)) + + return nil +} diff --git a/dumpling/export/string_chunking_test.go b/dumpling/export/string_chunking_test.go new file mode 100644 index 0000000000000..4e55d19ab8628 --- /dev/null +++ b/dumpling/export/string_chunking_test.go @@ -0,0 +1,162 @@ +// Copyright 2025 PingCAP, Inc. Licensed under Apache-2.0. + +package export + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEscapeSQLString(t *testing.T) { + // Test the escapeSQLString helper function + testCases := []struct { + input string + expected string + }{ + {"simple", "'simple'"}, + {"test'quote", "'test\\'quote'"}, + {"test\"doublequote", "'test\\\"doublequote'"}, + {"value_a123*(", "'value_a123*('"}, + {"value_b456)'", `'value_b456)\''`}, + {"", "''"}, + } + + for _, tc := range testCases { + result := escapeSQLString(tc.input) + require.Equal(t, tc.expected, result, "Failed for input: %s", tc.input) + } +} + +func TestBuildCursorWhereClause(t *testing.T) { + // Test cursor-based WHERE clause generation for efficient boundary sampling + + // Test with empty inputs + result := buildCursorWhereClause(nil, nil) + require.Empty(t, result, "Should return empty string for nil inputs") + + result = buildCursorWhereClause([]string{}, []string{}) + require.Empty(t, result, "Should return empty string for empty inputs") + + // Test single column cursor + columnNames := []string{"id"} + boundary := []string{"100"} + result = buildCursorWhereClause(columnNames, boundary) + expected := "`id` >= '100'" + require.Equal(t, expected, result, "Single column cursor should generate correct WHERE clause") + + // Test composite key cursor (2 columns) + columnNames = []string{"user_id", "created_at"} + boundary = []string{"user123", "2023-01-01"} + result = buildCursorWhereClause(columnNames, boundary) + expected = "`user_id` > 'user123' OR (`user_id` = 'user123' AND `created_at` >= '2023-01-01')" + require.Equal(t, expected, result, "Two column cursor should generate correct OR condition") + + // Test composite key cursor (3 columns) + columnNames = []string{"tenant_id", "user_id", "timestamp"} + boundary = []string{"tenant_a", "user_456", "1640995200"} + result = buildCursorWhereClause(columnNames, boundary) + expected = "`tenant_id` > 'tenant_a' OR (`tenant_id` = 'tenant_a' AND `user_id` > 'user_456') OR (`tenant_id` = 'tenant_a' AND `user_id` = 'user_456' AND `timestamp` >= '1640995200')" + require.Equal(t, expected, result, "Three column cursor should generate correct nested OR conditions") +} + +func TestBuildUpperBoundWhereClause(t *testing.T) { + // Test upper bound WHERE clause generation for chunk boundaries + + // Test with empty inputs + result := buildUpperBoundWhereClause(nil, nil) + require.Empty(t, result, "Should return empty string for nil inputs") + + // Test single column upper bound + columnNames := []string{"id"} + boundary := []string{"500"} + result = buildUpperBoundWhereClause(columnNames, boundary) + expected := "`id` < '500'" + require.Equal(t, expected, result, "Single column upper bound should generate correct WHERE clause") + + // Test composite key upper bound + columnNames = []string{"category", "item_id"} + boundary = []string{"electronics", "12345"} + result = buildUpperBoundWhereClause(columnNames, boundary) + expected = "`category` < 'electronics' OR (`category` = 'electronics' AND `item_id` < '12345')" + require.Equal(t, expected, result, "Composite key upper bound should generate correct OR condition") +} + +func TestBuildBoundedWhereClause(t *testing.T) { + // Test bounded WHERE clause generation for chunk ranges + + // Test with empty inputs + result := buildBoundedWhereClause(nil, nil, nil) + require.Empty(t, result, "Should return empty string for nil inputs") + + // Test single column bounded + columnNames := []string{"score"} + lowerBoundary := []string{"50"} + upperBoundary := []string{"100"} + result = buildBoundedWhereClause(columnNames, lowerBoundary, upperBoundary) + expected := "(`score` >= '50') AND (`score` < '100')" + require.Equal(t, expected, result, "Single column bounded should generate correct WHERE clause") + + // Test composite key bounded + columnNames = []string{"region", "sales_date"} + lowerBoundary = []string{"east", "2023-01-01"} + upperBoundary = []string{"west", "2023-12-31"} + result = buildBoundedWhereClause(columnNames, lowerBoundary, upperBoundary) + expected = "(`region` > 'east' OR (`region` = 'east' AND `sales_date` >= '2023-01-01')) AND (`region` < 'west' OR (`region` = 'west' AND `sales_date` < '2023-12-31'))" + require.Equal(t, expected, result, "Composite key bounded should combine lower and upper bounds correctly") +} + +func TestExtractOrderByColumns(t *testing.T) { + // Test ORDER BY clause parsing for composite key chunking + + testCases := []struct { + orderBy string + expected []string + }{ + {"ORDER BY `id`", []string{"`id`"}}, + {"ORDER BY `user_id`,`created_at`", []string{"`user_id`", "`created_at`"}}, + {"ORDER BY `a`, `b`, `c`", []string{"`a`", "`b`", "`c`"}}, + {"ORDER BY id", []string{"id"}}, + {"ORDER BY user_id, created_at DESC", []string{"user_id", "created_at DESC"}}, + {"", nil}, + // Test columns with commas in names + {"ORDER BY `col,1`, `col,2`", []string{"`col,1`", "`col,2`"}}, + {"ORDER BY `col,with,many,commas`, `normal_col`", []string{"`col,with,many,commas`", "`normal_col`"}}, + {"ORDER BY `first name`, `last,name`", []string{"`first name`", "`last,name`"}}, + } + + for _, tc := range testCases { + result := extractOrderByColumns(tc.orderBy) + require.Equal(t, tc.expected, result, "Failed to parse ORDER BY: %s", tc.orderBy) + } +} + +func TestStreamingChunkingProgressTracking(t *testing.T) { + // Test progress tracking for streaming string key chunking + + // Test chunk statistics initialization + stats := newTableChunkStat() + require.NotNil(t, stats, "Should create new chunk statistics") + require.Equal(t, int32(0), stats.sent.Load(), "Initial sent count should be 0") + require.Equal(t, int32(0), stats.finished.Load(), "Initial finished count should be 0") + require.False(t, stats.finalized.Load(), "Initial finalized status should be false") + + // Test chunk count updates + stats.sent.Add(1) + require.Equal(t, int32(1), stats.sent.Load(), "Sent count should increment") + + stats.finished.Add(1) + require.Equal(t, int32(1), stats.finished.Load(), "Finished count should increment") + + stats.finalized.Store(true) + require.True(t, stats.finalized.Load(), "Finalized status should be true") +} + +func TestMaxChunkLimitSafety(t *testing.T) { + // Test that the maxChunkLimit constant prevents infinite loops + require.Equal(t, int64(1000000), int64(maxChunkLimit), "Max chunk limit should be set to prevent runaway chunking") + + // Verify the safety check would trigger for large chunk counts + require.Greater(t, int64(maxChunkLimit), int64(100000), "Max chunk limit should be large enough for normal use") + require.Less(t, int64(maxChunkLimit), int64(10000000), "Max chunk limit should prevent excessive memory usage") +} diff --git a/dumpling/export/task.go b/dumpling/export/task.go index 760f5833df017..1c56da07e0ede 100644 --- a/dumpling/export/task.go +++ b/dumpling/export/task.go @@ -104,7 +104,7 @@ func NewTaskPolicyMeta(policyName, createPolicySQL string) *TaskPolicyMeta { } } -// NewTaskTableData returns a new dumping table data task +// NewTaskTableData returns a new dumpling table data task func NewTaskTableData(meta TableMeta, data TableDataIR, currentChunk, totalChunks int) *TaskTableData { return &TaskTableData{ Meta: meta, @@ -143,5 +143,11 @@ func (t *TaskPolicyMeta) Brief() string { func (t *TaskTableData) Brief() string { db, tbl := t.Meta.DatabaseName(), t.Meta.TableName() idx, total := t.ChunkIndex, t.TotalChunks + // Streaming string-key chunking buffers tasks before the total is known + // and emits intermediate chunks with TotalChunks=-1; render that as "?" + // so log lines stay readable instead of showing (idx/-1). + if total < 0 { + return fmt.Sprintf("data of table '%s'.'%s'(%d/?)", db, tbl, idx) + } return fmt.Sprintf("data of table '%s'.'%s'(%d/%d)", db, tbl, idx, total) } diff --git a/dumpling/export/util_for_test.go b/dumpling/export/util_for_test.go index 4d9e2ccc77fce..0449004f2c20a 100644 --- a/dumpling/export/util_for_test.go +++ b/dumpling/export/util_for_test.go @@ -257,6 +257,10 @@ func (m *mockTableIR) EscapeBackSlash() bool { return m.escapeBackSlash } +func (m *mockTableIR) ChunkKey() string { + return fmt.Sprintf("%s.%s", m.dbName, m.tblName) +} + func (m *mockTableIR) ColumnInfos() []*ColumnInfo { return m.columnInfos } diff --git a/dumpling/export/writer_serial_test.go b/dumpling/export/writer_serial_test.go index 8dbd2dd5f9fe7..36d168a04e31d 100644 --- a/dumpling/export/writer_serial_test.go +++ b/dumpling/export/writer_serial_test.go @@ -472,3 +472,138 @@ func createMockConfig() *Config { PromRegistry: promutil.NewDefaultRegistry(), } } + +// TestWriteInsertWithStatementSizeLimit tests that when statement size limits are reached, +// the writer correctly splits the output into multiple complete INSERT statements. +// +// Expected behavior: +// - When a statement size limit is set, large datasets are split into multiple INSERT statements +// - Each INSERT statement is complete and valid (has INSERT INTO prefix and ends with semicolon) +// - No duplicate or consecutive INSERT INTO prefixes appear in the output +// - Each new statement starts with its own INSERT INTO prefix after the previous one ends +// +// This ensures that each chunk can be executed independently, which is important for +// parallel imports and handling large datasets that exceed size limits. +func TestWriteInsertWithStatementSizeLimit(t *testing.T) { + cfg := createMockConfig() + + // Create test data with enough rows to trigger statement size switching + data := [][]driver.Value{ + {"1", "user1", "user1@example.com"}, + {"2", "user2", "user2@example.com"}, + {"3", "user3", "user3@example.com"}, + {"4", "user4", "user4@example.com"}, + {"5", "user5", "user5@example.com"}, + {"6", "user6", "user6@example.com"}, + } + colTypes := []string{"INT", "VARCHAR", "VARCHAR"} + specCmts := []string{"/*!40101 SET NAMES binary*/;"} + + tableIR := newMockTableIR("test", "users", data, specCmts, colTypes) + bf := NewBufferWriter() + + // Set a very small statement size limit to force statement switching + // This should cause the writer to create multiple INSERT statements + statementSizeLimit := uint64(150) // Small enough to fit only 2-3 rows per statement + conf := configForWriteSQL(cfg, UnspecifiedSize, statementSizeLimit) + m := newMetrics(conf.PromFactory, conf.Labels) + + n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf, m) + require.NoError(t, err) + require.Equal(t, uint64(6), n) + + output := bf.String() + + // Verify that we have multiple INSERT statements (due to size limit) + insertCount := strings.Count(output, "INSERT INTO `users` VALUES") + require.Greater(t, insertCount, 1, "Expected multiple INSERT statements due to size limit") + + // Verify each INSERT statement ends with semicolon + statements := strings.Split(output, "INSERT INTO `users` VALUES") + for i := 1; i < len(statements); i++ { + trimmed := strings.TrimSpace(statements[i]) + if trimmed != "" { + require.True(t, strings.HasSuffix(trimmed, ";"), "Each INSERT statement should end with semicolon") + } + } + + // Verify all rows are present (simple check) + require.Equal(t, 6, strings.Count(output, "("), "All 6 rows should be present") +} + +// TestWriteInsertWithoutStatementSizeLimit verifies normal behavior when no size limit is set +func TestWriteInsertWithoutStatementSizeLimit(t *testing.T) { + cfg := createMockConfig() + + data := [][]driver.Value{ + {"1", "user1", "user1@example.com"}, + {"2", "user2", "user2@example.com"}, + {"3", "user3", "user3@example.com"}, + } + colTypes := []string{"INT", "VARCHAR", "VARCHAR"} + specCmts := []string{"/*!40101 SET NAMES binary*/;"} + + tableIR := newMockTableIR("test", "users", data, specCmts, colTypes) + bf := NewBufferWriter() + + // No statement size limit + conf := configForWriteSQL(cfg, UnspecifiedSize, UnspecifiedSize) + m := newMetrics(conf.PromFactory, conf.Labels) + + n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf, m) + require.NoError(t, err) + require.Equal(t, uint64(3), n) + + output := bf.String() + + // Should have exactly one INSERT statement + insertCount := strings.Count(output, "INSERT INTO `users` VALUES") + require.Equal(t, 1, insertCount, "Expected exactly one INSERT statement when no size limit") + + // Verify the expected output format + expected := "/*!40101 SET NAMES binary*/;\n" + + "INSERT INTO `users` VALUES\n" + + "(1,'user1','user1@example.com'),\n" + + "(2,'user2','user2@example.com'),\n" + + "(3,'user3','user3@example.com');\n" + require.Equal(t, expected, output) +} + +// TestWriteInsertMultipleStatements tests that multiple complete INSERT statements are generated correctly +func TestWriteInsertMultipleStatements(t *testing.T) { + cfg := createMockConfig() + + data := [][]driver.Value{ + {"1", "a"}, + {"2", "b"}, + {"3", "c"}, + {"4", "d"}, + } + colTypes := []string{"INT", "VARCHAR"} + + tableIR := newMockTableIR("test", "items", data, nil, colTypes) + bf := NewBufferWriter() + + // Small statement size to force multiple statements + conf := configForWriteSQL(cfg, UnspecifiedSize, 50) + m := newMetrics(conf.PromFactory, conf.Labels) + + n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf, m) + require.NoError(t, err) + require.Equal(t, uint64(4), n) + + output := bf.String() + + // Should have multiple INSERT statements + insertCount := strings.Count(output, "INSERT INTO `items` VALUES") + require.Greater(t, insertCount, 1, "Should have multiple INSERT statements") + + // Each INSERT should end with semicolon + parts := strings.Split(output, "INSERT INTO `items` VALUES") + for i := 1; i < len(parts); i++ { + trimmed := strings.TrimSpace(parts[i]) + if trimmed != "" { + require.True(t, strings.HasSuffix(trimmed, ";"), "Each statement should end with semicolon") + } + } +} diff --git a/dumpling/export/writer_util.go b/dumpling/export/writer_util.go index b684b9d6523b1..9f75378fb0ac7 100644 --- a/dumpling/export/writer_util.go +++ b/dumpling/export/writer_util.go @@ -216,6 +216,7 @@ func WriteInsert( selectedField := meta.SelectedField() + // Always write INSERT prefix for each chunk to ensure complete statements // if has generated column if selectedField != "" && selectedField != "*" { insertStatementPrefix = fmt.Sprintf("INSERT INTO %s (%s) VALUES\n", @@ -226,10 +227,14 @@ func WriteInsert( } insertStatementPrefixLen := uint64(len(insertStatementPrefix)) + isFirstChunk := true for fileRowIter.HasNext() { - wp.currentStatementSize = 0 - bf.WriteString(insertStatementPrefix) - wp.AddFileSize(insertStatementPrefixLen) + if isFirstChunk { + wp.currentStatementSize = 0 + bf.WriteString(insertStatementPrefix) + wp.AddFileSize(insertStatementPrefixLen) + isFirstChunk = false + } for fileRowIter.HasNext() { lastBfSize := bf.Len() @@ -249,10 +254,17 @@ func WriteInsert( failpoint.Inject("AtEveryRow", nil) fileRowIter.Next() + // Check if we need to end current INSERT statement and start a new one + // This can happen due to file size limit or statement size limit shouldSwitch := wp.ShouldSwitchStatement() - if fileRowIter.HasNext() && !shouldSwitch { + + // Determine row terminator - always use comma for rows within a statement + hasMoreRows := fileRowIter.HasNext() && !shouldSwitch + + if hasMoreRows { bf.WriteString(",\n") } else { + // Always end with semicolon to complete the INSERT statement bf.WriteString(";\n") } if bf.Len() >= lengthLimit { @@ -272,6 +284,10 @@ func WriteInsert( } if shouldSwitch { + // Need to end current INSERT statement due to size limits + wp.currentStatementSize = 0 + // Always restart with INSERT prefix for complete statements + isFirstChunk = true break } } diff --git a/dumpling/tests/composite_string_key/README.md b/dumpling/tests/composite_string_key/README.md new file mode 100644 index 0000000000000..8ff2ab8c8e467 --- /dev/null +++ b/dumpling/tests/composite_string_key/README.md @@ -0,0 +1,60 @@ +# Composite String Key Integration Test + +This test verifies that dumpling correctly handles tables with composite string primary keys using the new streaming string key chunking functionality. + +## Test Cases + +### comp_str_case_0: Basic Composite String Key +- **Table**: `tenant_id` + `user_id` composite primary key +- **Data**: 10 rows with tenant/user combinations +- **Purpose**: Test basic composite string key chunking with varchar columns + +### comp_str_case_1: Special Characters and Edge Cases +- **Table**: `category` + `item_id` composite primary key +- **Data**: Categories with special characters like spaces, quotes, ampersands +- **Purpose**: Test SQL escaping and special character handling in composite keys + +### comp_str_case_2: Three-Column Composite Key +- **Table**: `region` + `country` + `city` composite primary key +- **Data**: Geographical data with three-level hierarchy +- **Purpose**: Test complex composite keys with more than 2 columns + +### comp_str_case_3: Unicode and Emoji Characters +- **Table**: `lang_code` + `message_id` composite primary key +- **Data**: Multi-language content with Unicode and emoji characters +- **Purpose**: Test international character support in string chunking + +## Key Features Tested + +1. **Composite Key Detection**: Verifies dumpling detects multi-column string primary keys +2. **Boundary Generation**: Tests cursor-based boundary sampling for efficient chunking +3. **WHERE Clause Generation**: Validates complex OR-based WHERE clauses for composite keys +4. **Progress Tracking**: Ensures streaming chunking progress is properly tracked +5. **SQL Escaping**: Tests proper escaping of special characters in boundary values +6. **Chunk Ordering**: Verifies data is dumped in correct primary key order + +## Expected Behavior + +- Dumpling should automatically detect the composite string primary key +- Tables should be chunked using streaming string key chunking (with --rows 5) +- Each chunk should contain properly ordered data based on the composite key +- Progress tracking should work correctly for streaming chunks +- All special characters should be properly escaped in SQL + +## Running the Test + +```bash +# Run the specific test +cd dumpling/tests +./run.sh composite_string_key + +# Or run all dumpling integration tests +make dumpling_integration_test +``` + +The test will: +1. Create the test database and tables +2. Insert test data with various composite string key scenarios +3. Run dumpling with chunking enabled (--rows 5) +4. Verify the output matches expected results +5. Clean up test artifacts \ No newline at end of file diff --git a/dumpling/tests/composite_string_key/data/comp_str_case_0.sql b/dumpling/tests/composite_string_key/data/comp_str_case_0.sql new file mode 100644 index 0000000000000..b11ffb3021241 --- /dev/null +++ b/dumpling/tests/composite_string_key/data/comp_str_case_0.sql @@ -0,0 +1,21 @@ +# test composite string primary key - basic scenario +create table `comp_str_case_0` ( + tenant_id varchar(50), + user_id varchar(50), + name varchar(100), + email varchar(255), + created_at datetime, + primary key (tenant_id, user_id) +); + +insert into `comp_str_case_0` values +('tenant_a', 'user_001', 'Alice Johnson', 'alice@tenant-a.com', '2023-01-15 10:30:00'), +('tenant_a', 'user_002', 'Bob Smith', 'bob@tenant-a.com', '2023-01-16 11:45:00'), +('tenant_a', 'user_003', 'Carol Davis', 'carol@tenant-a.com', '2023-01-17 09:15:00'), +('tenant_b', 'user_001', 'Dave Wilson', 'dave@tenant-b.com', '2023-01-18 14:20:00'), +('tenant_b', 'user_002', 'Eve Brown', 'eve@tenant-b.com', '2023-01-19 16:30:00'), +('tenant_b', 'user_003', 'Frank Miller', 'frank@tenant-b.com', '2023-01-20 08:45:00'), +('tenant_c', 'user_001', 'Grace Taylor', 'grace@tenant-c.com', '2023-01-21 13:10:00'), +('tenant_c', 'user_002', 'Henry Anderson', 'henry@tenant-c.com', '2023-01-22 12:00:00'), +('tenant_c', 'user_003', 'Ivy Thomas', 'ivy@tenant-c.com', '2023-01-23 15:30:00'), +('tenant_d', 'user_001', 'Jack Moore', 'jack@tenant-d.com', '2023-01-24 10:15:00'); \ No newline at end of file diff --git a/dumpling/tests/composite_string_key/data/comp_str_case_1.sql b/dumpling/tests/composite_string_key/data/comp_str_case_1.sql new file mode 100644 index 0000000000000..5999634be7697 --- /dev/null +++ b/dumpling/tests/composite_string_key/data/comp_str_case_1.sql @@ -0,0 +1,20 @@ +# test composite string primary key - special characters and edge cases +create table `comp_str_case_1` ( + category varchar(100), + item_id varchar(100), + description text, + price decimal(10,2), + primary key (category, item_id) +); + +insert into `comp_str_case_1` values +('electronics', 'item_001', 'Smartphone with special chars: !@#$%', 599.99), +('electronics', 'item_002', 'Laptop "Gaming Edition"', 1299.50), +('electronics', 'item_003', 'Tablet (WiFi Only)', 399.00), +('home & garden', 'item_001', 'Garden Tools Set', 89.99), +('home & garden', 'item_002', 'Outdoor Chair', 149.95), +('sports', 'item_001', 'Basketball', 29.99), +('sports', 'item_002', 'Tennis Racket', 79.50), +('toys & games', 'item_001', 'Board Game "Strategy Master"', 45.00), +('toys & games', 'item_002', 'Puzzle 1000 pieces', 25.99), +('clothing', 'item_001', 'T-Shirt Size M', 19.99); \ No newline at end of file diff --git a/dumpling/tests/composite_string_key/data/comp_str_case_2.sql b/dumpling/tests/composite_string_key/data/comp_str_case_2.sql new file mode 100644 index 0000000000000..8dc2878cd6390 --- /dev/null +++ b/dumpling/tests/composite_string_key/data/comp_str_case_2.sql @@ -0,0 +1,23 @@ +# test composite string primary key - three column composite key with mixed length strings +create table `comp_str_case_2` ( + region varchar(20), + country varchar(50), + city varchar(100), + population bigint, + area_km2 decimal(15,2), + primary key (region, country, city) +); + +insert into `comp_str_case_2` values +('asia', 'china', 'beijing', 21540000, 16410.54), +('asia', 'china', 'shanghai', 24280000, 6340.50), +('asia', 'japan', 'tokyo', 37400000, 2194.07), +('asia', 'japan', 'osaka', 19281000, 1905.00), +('europe', 'france', 'paris', 10858000, 105.40), +('europe', 'germany', 'berlin', 3669000, 891.85), +('europe', 'italy', 'rome', 2873000, 1285.31), +('north america', 'usa', 'new york', 8336000, 778.20), +('north america', 'usa', 'los angeles', 3979000, 1302.15), +('north america', 'canada', 'toronto', 2930000, 630.21), +('south america', 'brazil', 'sao paulo', 12330000, 1521.11), +('oceania', 'australia', 'sydney', 5312000, 12368.19); \ No newline at end of file diff --git a/dumpling/tests/composite_string_key/data/comp_str_case_3.sql b/dumpling/tests/composite_string_key/data/comp_str_case_3.sql new file mode 100644 index 0000000000000..d854e165452c8 --- /dev/null +++ b/dumpling/tests/composite_string_key/data/comp_str_case_3.sql @@ -0,0 +1,21 @@ +# test composite string primary key - unicode and emoji characters +create table `comp_str_case_3` ( + lang_code varchar(10), + message_id varchar(50), + content text, + primary key (lang_code, message_id) +); + +insert into `comp_str_case_3` values +('en', 'welcome', 'Welcome to our application! 😊'), +('en', 'goodbye', 'Thank you for using our service'), +('zh', 'welcome', '欢迎使用我们的应用程序!'), +('zh', 'goodbye', '感谢您使用我们的服务'), +('ja', 'welcome', 'アプリケーションへようこそ!'), +('ja', 'goodbye', 'サービスをご利用いただきありがとうございます'), +('es', 'welcome', '¡Bienvenido a nuestra aplicación!'), +('es', 'goodbye', 'Gracias por usar nuestro servicio'), +('fr', 'welcome', 'Bienvenue dans notre application !'), +('fr', 'goodbye', 'Merci d''utiliser notre service'), +('de', 'welcome', 'Willkommen in unserer Anwendung!'), +('ar', 'welcome', 'مرحبا بكم في تطبيقنا!'); \ No newline at end of file diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_0.000000000.sql b/dumpling/tests/composite_string_key/result/comp_str_case_0.000000000.sql new file mode 100644 index 0000000000000..b241bd6ee347b --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_0.000000000.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_0` VALUES +('tenant_a','user_001','Alice Johnson','alice@tenant-a.com','2023-01-15 10:30:00'), +('tenant_a','user_002','Bob Smith','bob@tenant-a.com','2023-01-16 11:45:00'), +('tenant_a','user_003','Carol Davis','carol@tenant-a.com','2023-01-17 09:15:00'), +('tenant_b','user_001','Dave Wilson','dave@tenant-b.com','2023-01-18 14:20:00'), +('tenant_b','user_002','Eve Brown','eve@tenant-b.com','2023-01-19 16:30:00'); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_0.000000001.sql b/dumpling/tests/composite_string_key/result/comp_str_case_0.000000001.sql new file mode 100644 index 0000000000000..5e46dd4c02f36 --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_0.000000001.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_0` VALUES +('tenant_b','user_003','Frank Miller','frank@tenant-b.com','2023-01-20 08:45:00'), +('tenant_c','user_001','Grace Taylor','grace@tenant-c.com','2023-01-21 13:10:00'), +('tenant_c','user_002','Henry Anderson','henry@tenant-c.com','2023-01-22 12:00:00'), +('tenant_c','user_003','Ivy Thomas','ivy@tenant-c.com','2023-01-23 15:30:00'), +('tenant_d','user_001','Jack Moore','jack@tenant-d.com','2023-01-24 10:15:00'); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_1.000000000.sql b/dumpling/tests/composite_string_key/result/comp_str_case_1.000000000.sql new file mode 100644 index 0000000000000..b936a7332d196 --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_1.000000000.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_1` VALUES +('clothing','item_001','T-Shirt Size M',19.99), +('electronics','item_001','Smartphone with special chars: !@#$%',599.99), +('electronics','item_002','Laptop \"Gaming Edition\"',1299.50), +('electronics','item_003','Tablet (WiFi Only)',399.00), +('home & garden','item_001','Garden Tools Set',89.99); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_1.000000001.sql b/dumpling/tests/composite_string_key/result/comp_str_case_1.000000001.sql new file mode 100644 index 0000000000000..d0f1c4f46122c --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_1.000000001.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_1` VALUES +('home & garden','item_002','Outdoor Chair',149.95), +('sports','item_001','Basketball',29.99), +('sports','item_002','Tennis Racket',79.50), +('toys & games','item_001','Board Game \"Strategy Master\"',45.00), +('toys & games','item_002','Puzzle 1000 pieces',25.99); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_2.000000000.sql b/dumpling/tests/composite_string_key/result/comp_str_case_2.000000000.sql new file mode 100644 index 0000000000000..aa0bb1e882318 --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_2.000000000.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_2` VALUES +('asia','china','beijing',21540000,16410.54), +('asia','china','shanghai',24280000,6340.50), +('asia','japan','osaka',19281000,1905.00), +('asia','japan','tokyo',37400000,2194.07), +('europe','france','paris',10858000,105.40); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_2.000000001.sql b/dumpling/tests/composite_string_key/result/comp_str_case_2.000000001.sql new file mode 100644 index 0000000000000..0ce70c5f18902 --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_2.000000001.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_2` VALUES +('europe','germany','berlin',3669000,891.85), +('europe','italy','rome',2873000,1285.31), +('north america','canada','toronto',2930000,630.21), +('north america','usa','los angeles',3979000,1302.15), +('north america','usa','new york',8336000,778.20); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_2.000000002.sql b/dumpling/tests/composite_string_key/result/comp_str_case_2.000000002.sql new file mode 100644 index 0000000000000..7ca51e1ef8357 --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_2.000000002.sql @@ -0,0 +1,5 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_2` VALUES +('oceania','australia','sydney',5312000,12368.19), +('south america','brazil','sao paulo',12330000,1521.11); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_3.000000000.sql b/dumpling/tests/composite_string_key/result/comp_str_case_3.000000000.sql new file mode 100644 index 0000000000000..01157949f98fc --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_3.000000000.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_3` VALUES +('ar','welcome','مرحبا بكم في تطبيقنا!'), +('de','welcome','Willkommen in unserer Anwendung!'), +('en','goodbye','Thank you for using our service'), +('en','welcome','Welcome to our application! 😊'), +('es','goodbye','Gracias por usar nuestro servicio'); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_3.000000001.sql b/dumpling/tests/composite_string_key/result/comp_str_case_3.000000001.sql new file mode 100644 index 0000000000000..ede19d9747438 --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_3.000000001.sql @@ -0,0 +1,8 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_3` VALUES +('es','welcome','¡Bienvenido a nuestra aplicación!'), +('fr','goodbye','Merci d\'utiliser notre service'), +('fr','welcome','Bienvenue dans notre application !'), +('ja','goodbye','サービスをご利用いただきありがとうございます'), +('ja','welcome','アプリケーションへようこそ!'); diff --git a/dumpling/tests/composite_string_key/result/comp_str_case_3.000000002.sql b/dumpling/tests/composite_string_key/result/comp_str_case_3.000000002.sql new file mode 100644 index 0000000000000..66dd13111349a --- /dev/null +++ b/dumpling/tests/composite_string_key/result/comp_str_case_3.000000002.sql @@ -0,0 +1,5 @@ +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; +/*!40101 SET NAMES binary*/; +INSERT INTO `comp_str_case_3` VALUES +('zh','goodbye','感谢您使用我们的服务'), +('zh','welcome','欢迎使用我们的应用程序!'); diff --git a/dumpling/tests/composite_string_key/run.sh b/dumpling/tests/composite_string_key/run.sh new file mode 100755 index 0000000000000..4a0d733bafae4 --- /dev/null +++ b/dumpling/tests/composite_string_key/run.sh @@ -0,0 +1,93 @@ +#!/usr/bin/env bash +# +# Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. +# Bash is required: the chunk-iteration loop below uses `read -r -d ''`, +# which is a bash extension that POSIX /bin/sh (e.g. dash) does not implement. + +set -eu + +# Configure TiDB server charset settings +run_sql "set global character_set_server=utf8mb4" +run_sql "set global character_set_client=utf8mb4" +run_sql "set global character_set_connection=utf8mb4" +run_sql "set global character_set_results=utf8mb4" +run_sql "set global collation_server=utf8mb4_bin" +run_sql "set global collation_connection=utf8mb4_bin" + +run_sql "drop database if exists composite_string_key" +run_sql "create database composite_string_key DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" +export DUMPLING_TEST_DATABASE=composite_string_key + +for data in "$DUMPLING_BASE_NAME"/data/*; do + run_sql_file "$data" +done + +# Refresh row statistics so EXPLAIN returns accurate estimates — the +# streaming chunker falls back to direct COUNT(*) when EXPLAIN under- +# estimates, but running ANALYZE up-front keeps this integration test +# deterministic regardless of MySQL/InnoDB stats-refresh timing. +for table in comp_str_case_0 comp_str_case_1 comp_str_case_2 comp_str_case_3; do + run_sql "analyze table composite_string_key.$table" +done + +# Run dumpling with --rows parameter to force chunking +# With --rows 5, tables will be split into multiple chunks +# Each chunk should contain a complete INSERT statement with ~5 rows +run_dumpling --rows 5 + +for file_path in "$DUMPLING_BASE_NAME"/data/*; do + base_name=$(basename "$file_path") + table_name="${base_name%.sql}" + + # Count chunk files via find so the argument list handles odd characters + # and we don't depend on `ls` output shape (shellcheck SC2012). + chunk_count=$(find "$DUMPLING_OUTPUT_DIR" -maxdepth 1 \ + -name "composite_string_key.${table_name}.[0-9]*.sql" | wc -l | tr -d ' ') + + # Determine expected chunks based on table row counts. + # With --rows 5: + # comp_str_case_0: 10 rows -> 2 chunks + # comp_str_case_1: 10 rows -> 2 chunks + # comp_str_case_2: 12 rows -> 3 chunks + # comp_str_case_3: 12 rows -> 3 chunks + case "$table_name" in + "comp_str_case_0" | "comp_str_case_1") + expected_chunks=2 + ;; + "comp_str_case_2" | "comp_str_case_3") + expected_chunks=3 + ;; + *) + echo "ERROR: Unknown table $table_name" + exit 1 + ;; + esac + + if [ "$chunk_count" -ne "$expected_chunks" ]; then + echo "ERROR: Expected $expected_chunks chunks for $table_name, but found $chunk_count" + exit 1 + fi + + # Compare each chunk file with its expected fixture. Use plain `diff` + # (no -B / -w) so whitespace-sensitive regressions — a dropped newline + # at a chunk boundary, a rogue space in a quoted value, a mis-escape — + # surface instead of being silently ignored. + find "$DUMPLING_OUTPUT_DIR" -maxdepth 1 \ + -name "composite_string_key.${table_name}.[0-9]*.sql" -print0 \ + | while IFS= read -r -d '' chunk_file; do + chunk_basename=$(basename "$chunk_file") + expected_file="${chunk_basename#composite_string_key.}" + + if [ ! -f "$DUMPLING_BASE_NAME/result/$expected_file" ]; then + echo "ERROR: Expected result file $DUMPLING_BASE_NAME/result/$expected_file not found" + exit 1 + fi + + if ! diff "$chunk_file" "$DUMPLING_BASE_NAME/result/$expected_file"; then + echo "ERROR: Chunk file $chunk_file does not match expected result $expected_file" + exit 1 + fi + done + + echo "Table $table_name: Successfully validated $chunk_count chunks" +done \ No newline at end of file diff --git a/dumpling/tests/composite_string_key_large/conf/diff_config.toml b/dumpling/tests/composite_string_key_large/conf/diff_config.toml new file mode 100644 index 0000000000000..4f2a90152357d --- /dev/null +++ b/dumpling/tests/composite_string_key_large/conf/diff_config.toml @@ -0,0 +1,25 @@ +# sync_diff_inspector config: verify MySQL source == TiDB target after +# dumpling --rows chunking + lightning reimport. + +check-thread-count = 4 +export-fix-sql = true +check-struct-only = false + +[task] + output-dir = "./output" + source-instances = ["mysql1"] + target-instance = "tidb0" + target-check-tables = ["composite_string_key_large.events", "composite_string_key_large.translations"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" diff --git a/dumpling/tests/composite_string_key_large/conf/lightning.toml b/dumpling/tests/composite_string_key_large/conf/lightning.toml new file mode 100644 index 0000000000000..adf174c03825c --- /dev/null +++ b/dumpling/tests/composite_string_key_large/conf/lightning.toml @@ -0,0 +1,20 @@ +### tidb-lightning config for composite_string_key_large round-trip + +[lightning] +server-mode = false +level = "error" +check-requirements = false + +[tikv-importer] +backend = "tidb" +on-duplicate = "error" + +[mydumper] +data-source-dir = "/tmp/dumpling_test_result/sql_res.composite_string_key_large" + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" +status-port = 10080 diff --git a/dumpling/tests/composite_string_key_large/run.sh b/dumpling/tests/composite_string_key_large/run.sh new file mode 100755 index 0000000000000..5114ed2bd6a8d --- /dev/null +++ b/dumpling/tests/composite_string_key_large/run.sh @@ -0,0 +1,165 @@ +#!/usr/bin/env bash +# +# Copyright 2026 PingCAP, Inc. Licensed under Apache-2.0. +# +# Bash is required: the chunk-iteration loop below uses `read -r -d ''`, +# which is a bash extension that POSIX /bin/sh (e.g. dash) does not implement. +# +# Larger-scale regression test for string-based (composite) primary-key +# chunking. It proves end-to-end correctness by: +# +# 1. Loading N rows into MySQL with composite string PKs that exercise +# cursor-based WHERE boundaries (shared prefix, unicode, escapes, +# 3-column keys). +# 2. Dumping via dumpling with --rows small enough to produce many +# chunks (streaming path in concurrentDumpStringFields). +# 3. Asserting the chunk count matches what the new streaming loop +# should produce. +# 4. Re-importing the dump into TiDB with tidb-lightning. +# 5. Using sync_diff_inspector to byte-diff MySQL (source) vs TiDB +# (target). Any row loss or duplication at chunk boundaries will +# surface here as a checksum mismatch. +# +# This complements dumpling/tests/composite_string_key (small fixture +# byte-diff). Fixture diff catches format regressions; this test catches +# data-loss/ordering regressions that fixture diff can't detect at scale. + +set -eu +cur=$(cd "$(dirname "$0")"; pwd) + +DB_NAME="composite_string_key_large" + +# Ensure UTF8MB4 so unicode PK values round-trip. +export DUMPLING_TEST_PORT=3306 +run_sql "set global character_set_server=utf8mb4" +run_sql "set global collation_server=utf8mb4_bin" + +# Drop and recreate on both sides. +run_sql "drop database if exists \`$DB_NAME\`;" +export DUMPLING_TEST_PORT=4000 +run_sql "drop database if exists \`$DB_NAME\`;" + +export DUMPLING_TEST_PORT=3306 +run_sql "create database \`$DB_NAME\` DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" +export DUMPLING_TEST_DATABASE="$DB_NAME" + +# --- Table A: 2-column composite PK with heavy shared prefix --------------- +# Exercises the cursor WHERE clause: (tenant = v1 AND user > v2) OR tenant > v1 +run_sql "CREATE TABLE \`$DB_NAME\`.\`events\` ( + tenant VARCHAR(32) NOT NULL, + event_id VARCHAR(32) NOT NULL, + payload VARCHAR(128) NOT NULL, + PRIMARY KEY (tenant, event_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +# 10 tenants x 50 events = 500 rows. All tenants share a long prefix so the +# cursor boundary often lands inside a run of identical tenant values — the +# exact regression surface that plain single-column boundaries miss. Generate +# via python into a tempfile, then load with the harness's run_sql_file so we +# pick up $DUMPLING_TEST_USER / port / future auth changes automatically. +events_sql=$(mktemp) +python3 - >"$events_sql" <<'PY' +rows = [] +for t in range(10): + tenant = f"tenant-prefix-shared-{t:03d}" + for e in range(50): + eid = f"evt-{e:04d}" + # Exercise SQL escaping: single/double quotes and a backslash. + # Double single-quotes and double backslashes so the literal is valid + # under MySQL's default sql_mode (which treats backslash as escape). + raw = f"payload for tenant {t} event {e} with 'single' and \"double\" quotes and a backslash \\." + payload = raw.replace("\\", "\\\\").replace("'", "''") + rows.append(f"('{tenant}','{eid}','{payload}')") +print("INSERT INTO events (tenant,event_id,payload) VALUES") +print(",\n".join(rows) + ";") +PY +run_sql_file "$events_sql" +rm -f "$events_sql" + +# --- Table B: 3-column composite PK with unicode and NULL-able body --------- +run_sql "CREATE TABLE \`$DB_NAME\`.\`translations\` ( + locale VARCHAR(8) NOT NULL, + namespace VARCHAR(32) NOT NULL, + msg_key VARCHAR(64) NOT NULL, + body TEXT NULL, + PRIMARY KEY (locale, namespace, msg_key) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +translations_sql=$(mktemp) +python3 - >"$translations_sql" <<'PY' +locales = ["en", "ja", "zh", "de", "fr"] +namespaces = ["auth", "billing", "profile"] +bodies = { + "en": "Hello, world!", + "ja": "こんにちは、世界!", + "zh": "你好,世界!", + "de": "Hallo, Welt!", + "fr": "Bonjour, le monde !", +} +rows = [] +for loc in locales: + for ns in namespaces: + for k in range(20): + msg_key = f"key_{k:03d}" + body = bodies[loc].replace("'", "''") + rows.append(f"('{loc}','{ns}','{msg_key}','{body}')") +print("INSERT INTO translations (locale,namespace,msg_key,body) VALUES") +print(",\n".join(rows) + ";") +PY +run_sql_file "$translations_sql" +rm -f "$translations_sql" + +# Analyze so EXPLAIN-based estimation is accurate. +run_sql "analyze table \`$DB_NAME\`.\`events\`;" +run_sql "analyze table \`$DB_NAME\`.\`translations\`;" + +# --- Dump ------------------------------------------------------------------ +# --rows 50 on a 500-row events table => ~10 chunks; 300-row translations => ~6. +run_dumpling --rows 50 --loglevel info + +# --- Assert chunk counts --------------------------------------------------- +events_chunks=$(find "$DUMPLING_OUTPUT_DIR" -maxdepth 1 -name "$DB_NAME.events.[0-9]*.sql" | wc -l | tr -d ' ') +translations_chunks=$(find "$DUMPLING_OUTPUT_DIR" -maxdepth 1 -name "$DB_NAME.translations.[0-9]*.sql" | wc -l | tr -d ' ') + +echo "events chunks: $events_chunks" +echo "translations chunks: $translations_chunks" + +# A correct streaming chunker must produce > 1 chunk here (proves parallelism +# was engaged). Exact count depends on estimation rounding; require >= 2. +if [ "$events_chunks" -lt 2 ]; then + echo "FAIL: events produced $events_chunks chunks, expected >= 2 (string chunking disabled?)" + exit 1 +fi +if [ "$translations_chunks" -lt 2 ]; then + echo "FAIL: translations produced $translations_chunks chunks, expected >= 2" + exit 1 +fi + +# Each data chunk must be a self-contained INSERT (per lance6716 review). +# The [0-9]* pattern excludes the -schema.sql meta files (which have no +# INSERT statement). NUL-delimited find/while loop is robust to any path. +find "$DUMPLING_OUTPUT_DIR" -maxdepth 1 -type f \ + \( -name "$DB_NAME.events.[0-9]*.sql" -o -name "$DB_NAME.translations.[0-9]*.sql" \) \ + -print0 | while IFS= read -r -d '' chunk; do + if ! grep -q "^INSERT INTO" "$chunk"; then + echo "FAIL: $chunk missing INSERT prefix" + exit 1 + fi + # The last non-empty line must end with ';' (a trailing '\n' after it is + # fine — tail -n 1 strips a single trailing newline). Looser checks such + # as `tail -c 3 | grep ';'` also match things like ');' followed by more + # bytes, which isn't the contract we want to assert. + last_line=$(tail -n 1 "$chunk") + case "$last_line" in + *";") ;; + *) echo "FAIL: $chunk last line does not end with ';' (got: $last_line)"; exit 1 ;; + esac +done + +# --- Round-trip: lightning imports chunks into TiDB ------------------------ +run_lightning "$cur/conf/lightning.toml" + +# --- sync_diff: MySQL source vs TiDB target must match exactly ------------- +check_sync_diff "$cur/conf/diff_config.toml" + +echo "composite_string_key_large: OK"