Skip to content

Commit b541789

Browse files
committed
[#61999] improve dumpling string key handling
1 parent 0870187 commit b541789

8 files changed

Lines changed: 1285 additions & 25 deletions

File tree

dumpling/export/dump.go

Lines changed: 139 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"slices"
1515
"strconv"
1616
"strings"
17+
"sync"
1718
"sync/atomic"
1819
"time"
1920

@@ -53,6 +54,9 @@ var errEmptyHandleVals = errors.New("empty handleVals for TiDB table")
5354
// see https://docs.pingcap.com/zh/tidb/dev/system-variables#tidb_enable_paging-%E4%BB%8E-v540-%E7%89%88%E6%9C%AC%E5%BC%80%E5%A7%8B%E5%BC%95%E5%85%A5
5455
var enablePagingVersion = semver.New("6.2.0")
5556

57+
// Maximum number of chunks to prevent infinite loops during boundary sampling
58+
const maxChunkLimit = 1000000
59+
5660
// Dumper is the dump progress structure
5761
type Dumper struct {
5862
tctx *tcontext.Context
@@ -68,7 +72,10 @@ type Dumper struct {
6872
totalTables int64
6973
charsetAndDefaultCollationMap map[string]string
7074

71-
speedRecorder *SpeedRecorder
75+
speedRecorder *SpeedRecorder
76+
adaptiveChunkSizer *AdaptiveChunkSizer
77+
78+
chunkedTables sync.Map
7279
}
7380

7481
// NewDumper returns a new Dumper
@@ -93,6 +100,7 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
93100
cancelCtx: cancelFn,
94101
selectTiDBTableRegionFunc: selectTiDBTableRegion,
95102
speedRecorder: NewSpeedRecorder(),
103+
adaptiveChunkSizer: NewAdaptiveChunkSizer(int64(conf.Rows)),
96104
}
97105

98106
var err error
@@ -347,27 +355,25 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh
347355
if err != nil {
348356
return nil, func() {}, err
349357
}
350-
writer := NewWriter(tctx, int64(i), conf, conn, d.extStore, d.metrics)
358+
writer := NewWriter(tctx, int64(i), conf, conn, d.extStore, d.metrics, d.adaptiveChunkSizer)
351359
writer.rebuildConnFn = rebuildConnFn
352360
writer.setFinishTableCallBack(func(task Task) {
353-
if _, ok := task.(*TaskTableData); ok {
354-
IncCounter(d.metrics.finishedTablesCounter)
355-
// FIXME: actually finishing the last chunk doesn't means this table is 'finished'.
356-
// We can call this table is 'finished' if all its chunks are finished.
357-
// Comment this log now to avoid ambiguity.
358-
// tctx.L().Debug("finished dumping table data",
359-
// zap.String("database", td.Meta.DatabaseName()),
360-
// zap.String("table", td.Meta.TableName()))
361-
failpoint.Inject("EnableLogProgress", func() {
362-
time.Sleep(1 * time.Second)
363-
tctx.L().Debug("EnableLogProgress, sleep 1s")
364-
})
365-
}
361+
// this is called when a file is finished.
366362
})
367363
writer.setFinishTaskCallBack(func(task Task) {
368364
IncGauge(d.metrics.taskChannelCapacity)
369365
if td, ok := task.(*TaskTableData); ok {
370366
d.metrics.completedChunks.Add(1)
367+
if val, ok := d.chunkedTables.Load(td.Meta.ChunkKey()); ok {
368+
chunkStats := val.(*tableChunkStat)
369+
finishedChunks := chunkStats.finished.Add(1)
370+
if chunkStats.finalized.Load() && finishedChunks == chunkStats.sent.Load() {
371+
IncCounter(d.metrics.finishedTablesCounter)
372+
d.chunkedTables.Delete(td.Meta.ChunkKey())
373+
}
374+
} else {
375+
IncCounter(d.metrics.finishedTablesCounter)
376+
}
371377
tctx.L().Debug("finish dumping table data task",
372378
zap.String("database", td.Meta.DatabaseName()),
373379
zap.String("table", td.Meta.TableName()),
@@ -619,7 +625,7 @@ func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *BaseConn, meta Tabl
619625
}
620626

621627
// Update total rows
622-
fieldName, _ := pickupPossibleField(tctx, meta, conn)
628+
fieldName, _, _ := pickupPossibleField(tctx, meta, conn)
623629
c := estimateCount(tctx, meta.DatabaseName(), meta.TableName(), conn, fieldName, conf)
624630
AddCounter(d.metrics.estimateTotalRowsCounter, float64(c))
625631

@@ -729,9 +735,10 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *BaseConn, met
729735
}
730736

731737
// concurrentDumpTable tries to split table into several chunks to dump
732-
func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) error {
738+
func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, meta TableMeta, taskChan chan<- Task) (err error) {
733739
conf := d.conf
734740
db, tbl := meta.DatabaseName(), meta.TableName()
741+
735742
if conf.ServerInfo.ServerType == version.ServerTypeTiDB &&
736743
conf.ServerInfo.ServerVersion != nil &&
737744
(conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 ||
@@ -751,7 +758,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met
751758
return err
752759
}
753760

754-
field, err := pickupPossibleField(tctx, meta, conn)
761+
field, isStringField, err := pickupPossibleField(tctx, meta, conn)
755762
if err != nil || field == "" {
756763
// skip split chunk logic if not found proper field
757764
tctx.L().Info("fallback to sequential dump due to no proper field. This won't influence the whole dump process",
@@ -774,6 +781,15 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met
774781
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
775782
}
776783

784+
// Handle string-based chunking
785+
if isStringField {
786+
tctx.L().Info("using string-based chunking",
787+
zap.String("database", db),
788+
zap.String("table", tbl),
789+
zap.String("field", field))
790+
return d.concurrentDumpStringField(tctx, conn, meta, taskChan, field, orderByClause, count)
791+
}
792+
777793
minv, maxv, err := d.selectMinAndMaxIntValue(tctx, conn, db, tbl, field)
778794
if err != nil {
779795
tctx.L().Info("fallback to sequential dump due to cannot get bounding values. This won't influence the whole dump process",
@@ -798,6 +814,15 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met
798814

799815
chunkIndex := 0
800816
nullValueCondition := ""
817+
chunkStats := newTableChunkStat()
818+
d.chunkedTables.Store(meta.ChunkKey(), chunkStats)
819+
defer func() {
820+
chunkStats.finalized.Store(true)
821+
if chunkStats.finished.Load() == chunkStats.sent.Load() {
822+
IncCounter(d.metrics.finishedTablesCounter)
823+
d.chunkedTables.Delete(meta.ChunkKey())
824+
}
825+
}()
801826
if conf.Where == "" {
802827
nullValueCondition = fmt.Sprintf("`%s` IS NULL OR ", escapeString(field))
803828
}
@@ -820,6 +845,11 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *BaseConn, met
820845
}
821846

822847
func (d *Dumper) sendTaskToChan(tctx *tcontext.Context, task Task, taskChan chan<- Task) (ctxDone bool) {
848+
if td, ok := task.(*TaskTableData); ok {
849+
if val, ok := d.chunkedTables.Load(td.Meta.ChunkKey()); ok {
850+
val.(*tableChunkStat).sent.Add(1)
851+
}
852+
}
823853
select {
824854
case <-tctx.Done():
825855
return true
@@ -831,6 +861,20 @@ func (d *Dumper) sendTaskToChan(tctx *tcontext.Context, task Task, taskChan chan
831861
}
832862
}
833863

864+
type tableChunkStat struct {
865+
sent *gatomic.Int32
866+
finished *gatomic.Int32
867+
finalized *gatomic.Bool
868+
}
869+
870+
func newTableChunkStat() *tableChunkStat {
871+
return &tableChunkStat{
872+
sent: gatomic.NewInt32(0),
873+
finished: gatomic.NewInt32(0),
874+
finalized: gatomic.NewBool(false),
875+
}
876+
}
877+
834878
func (d *Dumper) selectMinAndMaxIntValue(tctx *tcontext.Context, conn *BaseConn, db, tbl, field string) (minv, maxv *big.Int, err error) {
835879
conf, zero := d.conf, &big.Int{}
836880
query := fmt.Sprintf("SELECT MIN(`%s`),MAX(`%s`) FROM `%s`.`%s`",
@@ -923,6 +967,16 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn
923967
totalChunk += len(handleVals) + 1
924968
cachedHandleVals[i] = handleVals
925969
}
970+
971+
chunkStats := newTableChunkStat()
972+
d.chunkedTables.Store(meta.ChunkKey(), chunkStats)
973+
defer func() {
974+
chunkStats.finalized.Store(true)
975+
if chunkStats.finished.Load() == chunkStats.sent.Load() {
976+
IncCounter(d.metrics.finishedTablesCounter)
977+
d.chunkedTables.Delete(meta.ChunkKey())
978+
}
979+
}()
926980
for i, partition := range partitions {
927981
err := d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk)
928982
if err != nil {
@@ -1709,3 +1763,70 @@ func (d *Dumper) newTaskTableData(meta TableMeta, data TableDataIR, currentChunk
17091763
d.metrics.totalChunks.Add(1)
17101764
return NewTaskTableData(meta, data, currentChunk, totalChunks)
17111765
}
1766+
1767+
// extractOrderByColumns extracts column names from ORDER BY clause
1768+
// Input: "ORDER BY `item_id`,`photo_index`"
1769+
// Output: ["`item_id`", "`photo_index`"]
1770+
func extractOrderByColumns(orderByClause string) []string {
1771+
// Remove "ORDER BY " prefix
1772+
columnsStr := strings.TrimPrefix(orderByClause, "ORDER BY ")
1773+
1774+
// Split by comma and trim spaces
1775+
columns := strings.Split(columnsStr, ",")
1776+
for i, col := range columns {
1777+
columns[i] = strings.TrimSpace(col)
1778+
}
1779+
1780+
return columns
1781+
}
1782+
1783+
// checkRowNumberSupport tests if the database supports ROW_NUMBER() window function
1784+
// ROW_NUMBER() is supported by:
1785+
// - MySQL 8.0+ (window functions introduced)
1786+
// Not supported by:
1787+
// - MySQL 5.7 and earlier (no window functions)
1788+
// - Older database versions
1789+
func checkRowNumberSupport(tctx *tcontext.Context, conn *BaseConn, database, table string) bool {
1790+
// First, set the database context to avoid "No database selected" error
1791+
useDbQuery := fmt.Sprintf("USE `%s`", escapeString(database))
1792+
err := conn.QuerySQL(tctx, func(rows *sql.Rows) error {
1793+
// No rows expected from USE statement
1794+
return nil
1795+
}, func() {}, useDbQuery)
1796+
1797+
if err != nil {
1798+
// Can't set database context, fallback to OFFSET-based approach
1799+
tctx.L().Debug("Cannot set database context for ROW_NUMBER() check", zap.Error(err))
1800+
return false
1801+
}
1802+
1803+
// Test ROW_NUMBER() against the actual table to detect support
1804+
// This will return error 1305 (function not found) on older MySQL versions
1805+
testQuery := fmt.Sprintf("SELECT ROW_NUMBER() FROM `%s` LIMIT 1", escapeString(table))
1806+
1807+
var supported bool
1808+
err = conn.QuerySQL(tctx, func(rows *sql.Rows) error {
1809+
// If we get here, ROW_NUMBER() syntax is recognized (even if query fails for other reasons)
1810+
supported = true
1811+
return nil
1812+
}, func() {}, testQuery)
1813+
1814+
if err != nil {
1815+
// Check if it's the expected "function not found" error (1305)
1816+
if mysqlErr, ok := errors.Cause(err).(*mysql.MySQLError); ok && mysqlErr.Number == 1305 {
1817+
// Error 1305: FUNCTION does not exist - ROW_NUMBER() not supported
1818+
tctx.L().Info("ROW_NUMBER() not supported by this database version",
1819+
zap.Uint16("errorCode", mysqlErr.Number))
1820+
return false
1821+
}
1822+
// Other errors might indicate syntax issues or connection problems
1823+
// Still treat as not supported for safety
1824+
tctx.L().Debug("ROW_NUMBER() check failed with unexpected error", zap.Error(err))
1825+
return false
1826+
}
1827+
1828+
tctx.L().Info("ROW_NUMBER() support check completed",
1829+
zap.Bool("supported", supported))
1830+
1831+
return supported
1832+
}

dumpling/export/ir.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type TableMeta interface {
3535
ShowCreateView() string
3636
AvgRowLength() uint64
3737
HasImplicitRowID() bool
38+
ChunkKey() string
3839
}
3940

4041
// SQLRowIter is the iterator on a collection of sql.Row.

dumpling/export/metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ type globalMetadata struct {
2626
storage storage.ExternalStorage
2727
}
2828

29+
func (t *tableMeta) ChunkKey() string {
30+
return t.DatabaseName() + "." + t.TableName()
31+
}
32+
2933
const (
3034
metadataPath = "metadata"
3135
metadataTimeLayout = time.DateTime

0 commit comments

Comments
 (0)