@@ -5,93 +5,11 @@ import (
55 "fmt"
66 "strconv"
77 "strings"
8- "sync"
9- "time"
108
119 tcontext "github.com/pingcap/tidb/dumpling/context"
1210 "go.uber.org/zap"
1311)
1412
15- // AdaptiveChunkSizer calculates optimal chunk sizes based on performance feedback
16- type AdaptiveChunkSizer struct {
17- sync.Mutex
18- currentChunkSize int64
19- minChunkSize int64
20- maxChunkSize int64
21- }
22-
23- // NewAdaptiveChunkSizer creates a new adaptive chunk sizer
24- func NewAdaptiveChunkSizer (initialChunkSize int64 ) * AdaptiveChunkSizer {
25- min := initialChunkSize / 8
26- if min == 0 {
27- min = 1
28- }
29- return & AdaptiveChunkSizer {
30- currentChunkSize : initialChunkSize ,
31- minChunkSize : min ,
32- maxChunkSize : initialChunkSize * 8 ,
33- }
34- }
35-
36- func (acs * AdaptiveChunkSizer ) Get () int64 {
37- acs .Lock ()
38- defer acs .Unlock ()
39- return acs .currentChunkSize
40- }
41-
42- // Adjust adjusts chunk size based on actual performance
43- func (acs * AdaptiveChunkSizer ) Adjust (tctx * tcontext.Context , actualDuration time.Duration ) {
44- acs .Lock ()
45- defer acs .Unlock ()
46-
47- const (
48- fastThreshold = 100 * time .Millisecond // Increased from 50ms to 100ms
49- slowThreshold = 5 * time .Second // Increased from 1s to 5s
50- increaseFactor = 1.5
51- decreaseFactor = 1.5
52- )
53-
54- oldChunkSize := acs .currentChunkSize
55- newChunkSize := acs .currentChunkSize
56-
57- if actualDuration < fastThreshold && acs .currentChunkSize < acs .maxChunkSize {
58- newChunkSize = int64 (float64 (acs .currentChunkSize ) * increaseFactor )
59- } else if actualDuration > slowThreshold && acs .currentChunkSize > acs .minChunkSize {
60- newChunkSize = int64 (float64 (acs .currentChunkSize ) / decreaseFactor )
61- }
62-
63- // Apply bounds
64- if newChunkSize < acs .minChunkSize {
65- newChunkSize = acs .minChunkSize
66- }
67- if newChunkSize > acs .maxChunkSize {
68- newChunkSize = acs .maxChunkSize
69- }
70- if newChunkSize == 0 {
71- newChunkSize = 1
72- }
73-
74- // Log chunk size changes
75- if newChunkSize != oldChunkSize {
76- var reason string
77- if actualDuration < fastThreshold {
78- reason = "fast query detected"
79- } else if actualDuration > slowThreshold {
80- reason = "slow query detected"
81- } else {
82- reason = "bounds adjustment"
83- }
84-
85- tctx .L ().Info ("adaptive chunk size adjusted" ,
86- zap .Int64 ("oldChunkSize" , oldChunkSize ),
87- zap .Int64 ("newChunkSize" , newChunkSize ),
88- zap .Duration ("queryDuration" , actualDuration ),
89- zap .String ("reason" , reason ))
90- }
91-
92- acs .currentChunkSize = newChunkSize
93- }
94-
9513// concurrentDumpStringFields handles composite key chunking with multiple columns
9614func (d * Dumper ) concurrentDumpStringFields (tctx * tcontext.Context , conn * BaseConn , meta TableMeta , taskChan chan <- Task , fields []string , orderByClause string , estimatedCount uint64 ) error {
9715 conf := d .conf
@@ -103,7 +21,7 @@ func (d *Dumper) concurrentDumpStringFields(tctx *tcontext.Context, conn *BaseCo
10321 totalCount = int64 (conf .Rows ) * 5 // Conservative fallback
10422 }
10523
106- chunkSize := d . adaptiveChunkSizer . Get ( )
24+ chunkSize := int64 ( d . conf . Rows )
10725 if totalCount <= chunkSize {
10826 tctx .L ().Info ("table too small for chunking, using sequential dump" ,
10927 zap .String ("database" , db ), zap .String ("table" , tbl ))
@@ -191,23 +109,12 @@ func (d *Dumper) streamStringChunks(tctx *tcontext.Context, conn *BaseConn, meta
191109 break
192110 }
193111
194- // Get current adaptive chunk size for this boundary
195- currentChunkSize := d .adaptiveChunkSizer .Get ()
196- if i == 1 || currentChunkSize != chunkSize {
197- tctx .L ().Debug ("using adaptive chunk size for boundary sampling" ,
198- zap .String ("database" , db ),
199- zap .String ("table" , tbl ),
200- zap .Int64 ("chunkIndex" , i ),
201- zap .Int64 ("originalChunkSize" , chunkSize ),
202- zap .Int64 ("currentChunkSize" , currentChunkSize ))
203- }
204-
205112 // Sample boundary for chunk i
206113 var sampleQuery string
207114
208115 if supportsRowNumber {
209116 // Use ROW_NUMBER() for more reliable boundary sampling
210- rowNumber := i * currentChunkSize
117+ rowNumber := i * chunkSize
211118 sampleQuery = fmt .Sprintf (
212119 "SELECT %s FROM (SELECT %s, ROW_NUMBER() OVER (%s) as rn FROM `%s`.`%s`) t WHERE rn = %d" ,
213120 selectCols ,
@@ -220,7 +127,7 @@ func (d *Dumper) streamStringChunks(tctx *tcontext.Context, conn *BaseConn, meta
220127 // Use cursor-based boundary sampling to avoid expensive OFFSET for large tables
221128 if len (previousBoundary ) == 0 {
222129 // First boundary: OFFSET is acceptable for the first boundary
223- offset := currentChunkSize
130+ offset := chunkSize
224131 sampleQuery = fmt .Sprintf (
225132 "SELECT %s FROM `%s`.`%s` %s LIMIT 1 OFFSET %d" ,
226133 selectCols ,
@@ -240,7 +147,7 @@ func (d *Dumper) streamStringChunks(tctx *tcontext.Context, conn *BaseConn, meta
240147 escapeString (tbl ),
241148 fullWhere ,
242149 orderByClause ,
243- currentChunkSize ) // Skip currentChunkSize more rows from cursor position
150+ chunkSize ) // Skip chunkSize more rows from cursor position
244151 }
245152 }
246153
0 commit comments