Skip to content

Commit 3631232

Browse files
committed
mysql_cdc: chunk large tables across workers via PK-range splitting
Adds an opt-in snapshot_chunks_per_table field to mysql_cdc. When left at its default (1) the snapshot flow is unchanged. When set higher, each table's first primary-key column is probed for MIN and MAX under the shared consistent-snapshot transaction and the resulting integer range is split into N half-open chunks that are dispatched across the existing snapshot_max_parallel_tables worker pool. This is a follow-up to the inter-table parallelism introduced in the mysql_cdc: parallelise snapshot reads across tables change. Inter-table parallelism alone cannot accelerate a snapshot dominated by a single very large table, which is the most common shape for message/event tables. Chunking splits that single-table work across the worker pool instead. Chunking is supported for tables whose first primary-key column is an integer type (tinyint/smallint/mediumint/int/integer/bigint, signed or unsigned). Composite primary keys are supported - chunking partitions on the leading column only, and per-chunk keyset pagination continues to respect the full PK ordering. Tables with non-numeric first PK columns fall back to a whole-table read with an informational log line so mixed workloads keep working. Consistency model is unchanged. All worker transactions still begin under one FLUSH TABLES WITH READ LOCK window so every chunk observes identical state at the same binlog position. Planning runs inside one worker's snapshot transaction so MIN/MAX agree with what every worker subsequently reads. The outermost chunks in each table are open-ended (no lower bound on the first chunk, no upper bound on the last) so rows at the exact MIN/MAX endpoints and any rows outside [MIN, MAX] are captured rather than silently dropped. The fan-out helper (previously distributeTablesToWorkers) is generalised to a generic distributeWorkToWorkers so the parallel path can dispatch chunk-typed work units while the existing fan-out tests keep passing with string inputs. Field cap: snapshot_chunks_per_table is validated at config time to be within [1, 256], matching the pattern established for snapshot_max_parallel_tables. Tests added: - snapshot_chunking_test.go: splitIntRange coverage and overflow, buildChunkPredicate shapes, and generic fan-out against snapshotWorkUnit. - config_test.go: default, explicit, and out-of-range values for snapshot_chunks_per_table. - integration_test.go: TestIntegrationMySQLChunkedSnapshot exercises an int PK table and a composite (int, int) PK table with chunks=8 and asserts no duplicates across overlapping chunk ranges; TestIntegrationMySQLChunkedSnapshotNonNumericPKFallback confirms the VARCHAR-PK fallback reads the whole table without error.
1 parent 6b1a4fa commit 3631232

8 files changed

Lines changed: 781 additions & 61 deletions

internal/impl/mysql/config_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,83 @@ snapshot_max_parallel_tables: %d
9999
})
100100
}
101101
}
102+
103+
// Same shape as the max_parallel_tables tests: the new snapshot_chunks_per_table
104+
// field must default to 1 (preserving whole-table-read behaviour) and must
105+
// round-trip explicit values through the spec.
106+
func TestConfig_SnapshotChunksPerTable_DefaultAndExplicit(t *testing.T) {
107+
tests := []struct {
108+
name string
109+
yaml string
110+
expected int
111+
}{
112+
{
113+
name: "default",
114+
yaml: `
115+
dsn: user:password@tcp(localhost:3306)/db
116+
tables: [a]
117+
stream_snapshot: true
118+
checkpoint_cache: foo
119+
`,
120+
expected: 1,
121+
},
122+
{
123+
name: "explicit=16",
124+
yaml: `
125+
dsn: user:password@tcp(localhost:3306)/db
126+
tables: [a]
127+
stream_snapshot: true
128+
checkpoint_cache: foo
129+
snapshot_chunks_per_table: 16
130+
`,
131+
expected: 16,
132+
},
133+
}
134+
135+
for _, tc := range tests {
136+
t.Run(tc.name, func(t *testing.T) {
137+
conf, err := mysqlStreamConfigSpec.ParseYAML(tc.yaml, nil)
138+
require.NoError(t, err)
139+
140+
got, err := conf.FieldInt(fieldSnapshotChunksPerTable)
141+
require.NoError(t, err)
142+
assert.Equal(t, tc.expected, got)
143+
})
144+
}
145+
}
146+
147+
// Guards the same validation predicate for chunks_per_table that the
148+
// constructor enforces: values outside [1, maxSnapshotChunksPerTable] must
149+
// fail fast rather than produce runaway planning queries.
150+
func TestConfig_SnapshotChunksPerTable_InvalidValuesRejected(t *testing.T) {
151+
tests := []struct {
152+
name string
153+
value int
154+
}{
155+
{"zero", 0},
156+
{"negative", -1},
157+
{"above_upper_bound", maxSnapshotChunksPerTable + 1},
158+
{"absurdly_large", 100000},
159+
}
160+
161+
for _, tc := range tests {
162+
t.Run(tc.name, func(t *testing.T) {
163+
yaml := fmt.Sprintf(`
164+
dsn: user:password@tcp(localhost:3306)/db
165+
tables: [a]
166+
stream_snapshot: true
167+
checkpoint_cache: foo
168+
snapshot_chunks_per_table: %d
169+
`, tc.value)
170+
conf, err := mysqlStreamConfigSpec.ParseYAML(yaml, nil)
171+
require.NoError(t, err, "spec parsing itself should succeed; validation is enforced inside newMySQLStreamInput")
172+
173+
got, err := conf.FieldInt(fieldSnapshotChunksPerTable)
174+
require.NoError(t, err)
175+
assert.True(t,
176+
got < 1 || got > maxSnapshotChunksPerTable,
177+
"configured value should violate the [1, %d] range enforced in newMySQLStreamInput", maxSnapshotChunksPerTable,
178+
)
179+
})
180+
}
181+
}

internal/impl/mysql/input_mysql_stream.go

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
fieldStreamSnapshot = "stream_snapshot"
4343
fieldSnapshotMaxBatchSize = "snapshot_max_batch_size"
4444
fieldSnapshotMaxParallelTables = "snapshot_max_parallel_tables"
45+
fieldSnapshotChunksPerTable = "snapshot_chunks_per_table"
4546
fieldMaxReconnectAttempts = "max_reconnect_attempts"
4647
fieldBatching = "batching"
4748
fieldCheckpointKey = "checkpoint_key"
@@ -60,6 +61,14 @@ const (
6061
// an issue — 256 is already well beyond the point at which the MySQL
6162
// server's own connection limits dominate.
6263
maxSnapshotParallelTables = 256
64+
65+
// maxSnapshotChunksPerTable caps chunks_per_table for the same reason as
66+
// maxSnapshotParallelTables: a mis-typed value should fail fast at config
67+
// parse time rather than produce thousands of MIN/MAX planning queries
68+
// and slow down startup. The actual concurrency ceiling is still
69+
// snapshot_max_parallel_tables — chunks above that just rebalance work
70+
// across the fixed worker pool.
71+
maxSnapshotChunksPerTable = 256
6372
)
6473

6574
func notImportedAWSOptFn(_ context.Context, awsConf *service.ParsedConfig, _ *mysql.Config, _ *service.Logger) (TokenBuilder, error) {
@@ -113,7 +122,11 @@ This input adds the following metadata fields to each message:
113122
Description("The maximum number of rows to be streamed in a single batch when taking a snapshot.").
114123
Default(1000),
115124
service.NewIntField(fieldSnapshotMaxParallelTables).
116-
Description("The maximum number of tables that may be snapshotted in parallel. When set to `1` (the default) tables are read sequentially using a single transaction, preserving the previous behaviour. When set higher, multiple `REPEATABLE READ` transactions are opened on separate connections under a single brief `FLUSH TABLES ... WITH READ LOCK` window so every worker observes an identical, globally-consistent snapshot at the same binlog position. A value greater than the number of configured `tables` is effectively capped at the table count. Must be between `1` and `256`.").
125+
Description("The maximum number of tables that may be snapshotted in parallel. When set to `1` (the default) tables are read sequentially using a single transaction, preserving the previous behaviour. When set higher, multiple `REPEATABLE READ` transactions are opened on separate connections under a single brief `FLUSH TABLES ... WITH READ LOCK` window so every worker observes an identical, globally-consistent snapshot at the same binlog position. Must be between `1` and `256`.").
126+
Advanced().
127+
Default(1),
128+
service.NewIntField(fieldSnapshotChunksPerTable).
129+
Description("The number of primary-key chunks each table is split into during the snapshot. When set to `1` (the default) each table is read as a single unit. When set higher, each table's first primary-key column is probed for `MIN` and `MAX` and the resulting integer range is split into N equal half-open chunks that are dispatched across the `"+fieldSnapshotMaxParallelTables+"` worker pool. This is how a single very large table is parallelised. Only tables whose first primary-key column is an integer type (`tinyint`, `smallint`, `mediumint`, `int`, `integer`, or `bigint`, signed or unsigned) are chunked; tables with non-numeric first PK columns fall back to a single whole-table read and log the reason. Composite primary keys are supported — chunking uses the leading column only, and per-chunk keyset pagination continues to respect the full PK ordering. Must be between `1` and `256`.").
117130
Advanced().
118131
Default(1),
119132
service.NewIntField(fieldMaxReconnectAttempts).
@@ -198,6 +211,7 @@ type mysqlStreamInput struct {
198211
checkPointLimit int
199212
fieldSnapshotMaxBatchSize int
200213
fieldSnapshotMaxParallelTables int
214+
fieldSnapshotChunksPerTable int
201215

202216
logger *service.Logger
203217
res *service.Resources
@@ -303,6 +317,16 @@ func newMySQLStreamInput(conf *service.ParsedConfig, res *service.Resources) (s
303317
return nil, fmt.Errorf("field '%s' must be at most %d, got %d", fieldSnapshotMaxParallelTables, maxSnapshotParallelTables, i.fieldSnapshotMaxParallelTables)
304318
}
305319

320+
if i.fieldSnapshotChunksPerTable, err = conf.FieldInt(fieldSnapshotChunksPerTable); err != nil {
321+
return nil, err
322+
}
323+
if i.fieldSnapshotChunksPerTable < 1 {
324+
return nil, fmt.Errorf("field '%s' must be at least 1, got %d", fieldSnapshotChunksPerTable, i.fieldSnapshotChunksPerTable)
325+
}
326+
if i.fieldSnapshotChunksPerTable > maxSnapshotChunksPerTable {
327+
return nil, fmt.Errorf("field '%s' must be at most %d, got %d", fieldSnapshotChunksPerTable, maxSnapshotChunksPerTable, i.fieldSnapshotChunksPerTable)
328+
}
329+
306330
if i.canalMaxConnAttempts, err = conf.FieldInt(fieldMaxReconnectAttempts); err != nil {
307331
return nil, err
308332
}
@@ -444,7 +468,7 @@ func (i *mysqlStreamInput) startMySQLSync(ctx context.Context, pos *position, sn
444468
if snapshot != nil {
445469
var startPos *position
446470
var err error
447-
if i.fieldSnapshotMaxParallelTables <= 1 {
471+
if i.fieldSnapshotMaxParallelTables <= 1 && i.fieldSnapshotChunksPerTable <= 1 {
448472
startPos, err = i.runSequentialSnapshot(ctx, snapshot)
449473
} else {
450474
startPos, err = i.runParallelSnapshot(ctx, snapshot)
@@ -516,12 +540,33 @@ func (i *mysqlStreamInput) runParallelSnapshot(ctx context.Context, snapshot *Sn
516540
db := snapshot.db
517541
snapshot.db = nil
518542

519-
set, startPos, err := prepareParallelSnapshotSet(ctx, i.logger, db, i.tables, i.fieldSnapshotMaxParallelTables)
543+
// Workers are capped by the plausible number of work units: at most
544+
// chunks_per_table * len(tables), and never more than requested. Planning
545+
// may emit fewer units (e.g. some tables fall back to whole-table reads)
546+
// but the over-provisioning cost is bounded and connections held by idle
547+
// workers are released when the snapshot completes.
548+
workerCount := i.fieldSnapshotMaxParallelTables
549+
if maxUnits := len(i.tables) * i.fieldSnapshotChunksPerTable; workerCount > maxUnits {
550+
workerCount = maxUnits
551+
}
552+
553+
set, startPos, err := prepareParallelSnapshotSet(ctx, i.logger, db, i.tables, workerCount)
520554
if err != nil {
521555
// prepareParallelSnapshotSet closed db on its own error paths.
522556
return nil, fmt.Errorf("unable to prepare parallel snapshot: %w", err)
523557
}
524-
if err := i.readSnapshotParallel(ctx, set); err != nil {
558+
559+
// Plan work units using any worker's consistent-snapshot transaction.
560+
// All workers observe identical state so MIN/MAX computed here apply
561+
// uniformly to every worker's subsequent reads.
562+
units, err := planSnapshotWork(ctx, set.workers[0], i.tables, i.fieldSnapshotChunksPerTable)
563+
if err != nil {
564+
_ = set.close()
565+
return nil, fmt.Errorf("plan snapshot work: %w", err)
566+
}
567+
i.logger.Infof("Parallel snapshot planned: %d tables -> %d work units across %d workers", len(i.tables), len(units), len(set.workers))
568+
569+
if err := i.readSnapshotParallel(ctx, set, units); err != nil {
525570
_ = set.close()
526571
return nil, fmt.Errorf("failed reading snapshot: %w", err)
527572
}
@@ -537,18 +582,22 @@ func (i *mysqlStreamInput) runParallelSnapshot(ctx context.Context, snapshot *Sn
537582

538583
func (i *mysqlStreamInput) readSnapshot(ctx context.Context, snapshot *Snapshot) error {
539584
for _, table := range i.tables {
540-
if err := i.readSnapshotTable(ctx, snapshot, table); err != nil {
585+
if err := i.readSnapshotWorkUnit(ctx, snapshot, snapshotWorkUnit{table: table}); err != nil {
541586
return err
542587
}
543588
}
544589
return nil
545590
}
546591

547-
// readSnapshotTable snapshots a single table by paging through its rows in
548-
// primary-key order using the REPEATABLE READ / CONSISTENT SNAPSHOT transaction
549-
// held by snapshot. Extracted so both the sequential (single-snapshot) and the
550-
// parallel (per-worker snapshot) paths share identical per-table semantics.
551-
func (i *mysqlStreamInput) readSnapshotTable(ctx context.Context, snapshot *Snapshot, table string) error {
592+
// readSnapshotWorkUnit snapshots one work unit — either a whole table or a
593+
// primary-key chunk of a table — by paging through its rows in primary-key
594+
// order using the REPEATABLE READ / CONSISTENT SNAPSHOT transaction held by
595+
// snapshot. When unit.bounds is nil the whole table is read; otherwise rows
596+
// are filtered by the chunk's [lowerIncl, upperExcl) range on the first PK
597+
// column. Both the sequential and the parallel paths use this same body so
598+
// per-table semantics are identical regardless of chunking configuration.
599+
func (i *mysqlStreamInput) readSnapshotWorkUnit(ctx context.Context, snapshot *Snapshot, unit snapshotWorkUnit) error {
600+
table := unit.table
552601
// Pre-populate schema cache so snapshot messages carry schema metadata.
553602
if tbl, err := i.canal.GetTable(i.mysqlConfig.DBName, table); err == nil {
554603
if _, err := i.getTableSchema(tbl); err != nil {
@@ -571,9 +620,9 @@ func (i *mysqlStreamInput) readSnapshotTable(ctx context.Context, snapshot *Snap
571620
for {
572621
var batchRows *sql.Rows
573622
if numRowsProcessed == 0 {
574-
batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, nil, i.fieldSnapshotMaxBatchSize)
623+
batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, unit.bounds, nil, i.fieldSnapshotMaxBatchSize)
575624
} else {
576-
batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, &lastSeenPksValues, i.fieldSnapshotMaxBatchSize)
625+
batchRows, err = snapshot.querySnapshotTable(ctx, table, tablePks, unit.bounds, &lastSeenPksValues, i.fieldSnapshotMaxBatchSize)
577626
}
578627
if err != nil {
579628
return fmt.Errorf("executing snapshot table query: %s", err)

0 commit comments

Comments
 (0)