mysql_cdc: chunk large tables across workers via PK-range splitting#4342
Open
ankit481 wants to merge 3 commits intoredpanda-data:mainfrom
Open
mysql_cdc: chunk large tables across workers via PK-range splitting#4342ankit481 wants to merge 3 commits intoredpanda-data:mainfrom
ankit481 wants to merge 3 commits intoredpanda-data:mainfrom
Conversation
Adds an opt-in `snapshot_max_parallel_tables` field to the `mysql_cdc` input. When left at the default (`1`) the snapshot flow is the existing single-transaction, single-goroutine path: bit-for-bit unchanged. When set above `1`, N REPEATABLE READ / CONSISTENT SNAPSHOT transactions are opened on independent connections under a single brief FLUSH TABLES ... WITH READ LOCK window. Every worker observes identical state at the same binlog position, and the configured tables are fanned out across the workers via an errgroup. This preserves the existing global consistent-snapshot invariant and the existing fail-halt failure mode, while removing the per-table serial bottleneck for pipelines with many tables. The inner per-table loop is extracted into readSnapshotTable so both paths share identical semantics. The sequential path is moved into runSequentialSnapshot (unchanged body); the parallel path lives in runParallelSnapshot and parallel_snapshot.go.
Defense-in-depth against a mis-typed config value that would otherwise try to open thousands of MySQL connections at snapshot time. 256 sits well above any realistic pipeline (the existing cap at len(tables) is the more common practical bound) and well below the range where a typo (e.g. 10000) would cause a connection storm before MySQLs own max_connections kicked in. Surfaces as a clear configuration error at Connect time rather than a runtime too-many-connections from the server.
Adds an opt-in snapshot_chunks_per_table field to mysql_cdc. When left at its default (1) the snapshot flow is unchanged. When set higher, each table's first primary-key column is probed for MIN and MAX under the shared consistent-snapshot transaction and the resulting integer range is split into N half-open chunks that are dispatched across the existing snapshot_max_parallel_tables worker pool. This is a follow-up to the inter-table parallelism introduced in the mysql_cdc: parallelise snapshot reads across tables change. Inter-table parallelism alone cannot accelerate a snapshot dominated by a single very large table, which is the most common shape for message/event tables. Chunking splits that single-table work across the worker pool instead. Chunking is supported for tables whose first primary-key column is an integer type (tinyint/smallint/mediumint/int/integer/bigint, signed or unsigned). Composite primary keys are supported - chunking partitions on the leading column only, and per-chunk keyset pagination continues to respect the full PK ordering. Tables with non-numeric first PK columns fall back to a whole-table read with an informational log line so mixed workloads keep working. Consistency model is unchanged. All worker transactions still begin under one FLUSH TABLES WITH READ LOCK window so every chunk observes identical state at the same binlog position. Planning runs inside one worker's snapshot transaction so MIN/MAX agree with what every worker subsequently reads. The outermost chunks in each table are open-ended (no lower bound on the first chunk, no upper bound on the last) so rows at the exact MIN/MAX endpoints and any rows outside [MIN, MAX] are captured rather than silently dropped. The fan-out helper (previously distributeTablesToWorkers) is generalised to a generic distributeWorkToWorkers so the parallel path can dispatch chunk-typed work units while the existing fan-out tests keep passing with string inputs. Field cap: snapshot_chunks_per_table is validated at config time to be within [1, 256], matching the pattern established for snapshot_max_parallel_tables. Tests added: - snapshot_chunking_test.go: splitIntRange coverage and overflow, buildChunkPredicate shapes, and generic fan-out against snapshotWorkUnit. - config_test.go: default, explicit, and out-of-range values for snapshot_chunks_per_table. - integration_test.go: TestIntegrationMySQLChunkedSnapshot exercises an int PK table and a composite (int, int) PK table with chunks=8 and asserts no duplicates across overlapping chunk ranges; TestIntegrationMySQLChunkedSnapshotNonNumericPKFallback confirms the VARCHAR-PK fallback reads the whole table without error.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #4341. Builds on #4320.
Adds an opt-in
snapshot_chunks_per_tablefield tomysql_cdc. When left at the default (1) the snapshot flow is unchanged from #4320. When set higher, each table's first primary-key column is probed forMIN/MAXunder the shared consistent-snapshot transaction and the resulting integer range is split into N half-open chunks that are dispatched across the existingsnapshot_max_parallel_tablesworker pool.This is the intra-table parallelism piece. #4320 unblocks pipelines with many tables; this PR unblocks pipelines dominated by a single very large table — the shape behind the 400M-row reference workload in #4341.
Motivation
Inter-table parallelism alone cannot accelerate a snapshot where one table holds the bulk of the rows. Splitting that table across the worker pool is what closes the gap to AWS DMS.
Target from #4341: 400M rows in ~45 min (1h acceptable). At 16 workers each reading a chunked slice of the PK space, the per-worker throughput needed is ~25M rows/hr, which the existing per-worker code path already achieves on commodity RDS hardware in the observed 30M rows/hr baseline.
Design
Consistency model is unchanged
Every worker transaction is still opened inside the single
FLUSH TABLES WITH READ LOCKwindow established byprepareParallelSnapshotSet. MIN/MAX probing runs inside one of those worker transactions, so boundaries computed during planning agree exactly with the state every worker subsequently reads. The binlog position captured under the lock applies uniformly to every chunk.No new lock acquisition, no relaxation of isolation, no new handoff with the binlog stream.
Chunking math
For each table:
chunks_per_table <= 1: emit one whole-table unit (no planning query).MIN(pk), MAX(pk), split[MIN, MAX]into N half-open[lo, hi)chunks.Outermost chunks are open-ended — the first chunk has no lower bound and the last chunk has no upper bound. This guarantees every row in
[MIN, MAX]is covered without off-by-one risk and that any row outside[MIN, MAX]under the snapshot is still picked up rather than silently dropped.Composite primary keys
Chunking partitions on the leading PK column only. Per-chunk keyset pagination inside
querySnapshotTablecontinues to use the full PK tuple, so ordering and pagination remain correct for composite PKs such as(tenant_id, id).Tradeoff: a skewed leading column produces uneven chunks. Operators with that data shape should leave
snapshot_chunks_per_tableat1and rely onsnapshot_max_parallel_tablesalone. This is a documented limitation, not a correctness issue — no row is ever read twice, and no row is ever missed.SQL shape
Example:
chunks_per_table=4on anINTPK with range[0, 100), 2nd chunk, mid-pagination:Bindings:
[25, 50, lastSeenID, limit].First chunk omits the lower bound. Last chunk omits the upper bound. Middle chunks have both.
Files
internal/impl/mysql/snapshot_chunking.go(new):planSnapshotWork,splitIntRange,buildChunkPredicate, numeric-PK detection viainformation_schema.columns.internal/impl/mysql/snapshot.go:querySnapshotTablethreads*chunkBoundsthrough theWHEREclause. The existingbuildOrderByClauseand keyset pagination are untouched.internal/impl/mysql/input_mysql_stream.go: newsnapshot_chunks_per_tablefield with[1, 256]validation, renamedreadSnapshotTable->readSnapshotWorkUnit, chunking plan runs insiderunParallelSnapshot.internal/impl/mysql/parallel_snapshot.go:distributeTablesToWorkersgeneralised todistributeWorkToWorkers[T any]so work units of typesnapshotWorkUnituse the same fan-out code path as tables did before. Removed the internalworkerCount > len(tables)cap — the caller sizes the pool against the expected work-unit count.Dispatch
startMySQLSyncnow routes torunParallelSnapshotwhenever eithersnapshot_max_parallel_tables > 1orsnapshot_chunks_per_table > 1. When both are 1 (default) the original sequential path runs unchanged.Backwards compatibility
Default
snapshot_chunks_per_table: 1produces byte-identical behaviour to #4320.Advanced()int field. Existing YAML is unaffected.runSequentialSnapshotis untouched.chunks_per_table=1, every work unit hasbounds: nil, soquerySnapshotTableemits the sameWHERE-less query as before (just via a slightly different code path).TestIntegrationMySQLSnapshotAndCDC,TestIntegrationMySQLSnapshotConsistency,TestIntegrationMySQLCDCWithCompositePrimaryKeys,TestIntegrationMySQLCDCSchemaMetadata,TestIntegrationMySQLParallelSnapshot) all pass unchanged.Tests added
Unit (
snapshot_chunking_test.go)Pure-function coverage of the chunking math and SQL predicate:
SingleChunkWhenNLEOne—nof 0, 1, -3 all produce one fully-open chunk.SingleChunkWhenRangeCollapsed—lo == hiand reversed ranges degenerate to one chunk.OutermostChunksAreOpenEnded— first chunklo==nil, last chunkhi==nil.ChunksCoverAllIntegersExactlyOnce— enumerates every integer in[lo, hi]for severalnand asserts single-chunk membership under half-open semantics.WhenNExceedsSpanStepIsAtLeastOne— short ranges asked for many chunks still cover every value.LargeSpanDoesNotOverflow—hi-lonearint64limits, guards theuint64cast insplitIntRange.BuildChunkPredicate_*— nil, both-bounds, lower-only, upper-only, fully-open variants produce the expected SQL fragment and arg list.DistributeWorkToWorkers_SnapshotWorkUnitInstantiation— the generic fan-out helper accepts the new work-unit type and visits every item exactly once.Existing
distributeTablesToWorkerstests continue to pass — they now exercisedistributeWorkToWorkersatT = string.Config (
config_test.go)TestConfig_SnapshotChunksPerTable_DefaultAndExplicit— default of 1, explicit 16 round-trips through the spec.TestConfig_SnapshotChunksPerTable_InvalidValuesRejected— zero, negative, above-cap, and absurdly-large values all violate the constructor's validation predicate.Integration (
integration_test.go)TestIntegrationMySQLChunkedSnapshot— MySQL 8.0 via testcontainers. Creates oneINTPK table and one composite(tenant_id, id)PK table, each loaded with 2000 rows. Runsmysql_cdcwithsnapshot_max_parallel_tables: 4, snapshot_chunks_per_table: 8. Asserts: every row emitted exactly once, no duplicates from overlapping chunk ranges (tracked via async.Mapof observed PKs), post-snapshot inserts are picked up by the binlog stream.TestIntegrationMySQLChunkedSnapshotNonNumericPKFallback—VARCHARPK table withchunks_per_table: 8. Verifies the fallback path reads the whole table without error and emits every row.Local test results
Unit (whole package, race + shuffle):
Integration — new tests:
Integration — existing sequential-path regressions (backwards-compat sanity check):
gofmtandgo vetclean.Log excerpt from
TestIntegrationMySQLChunkedSnapshotconfirming the planner emits 16 work units (2 tables x 8 chunks) across 4 workers, with correct open-ended outermost chunks and full-tuple keyset pagination for composite PKs:Out of scope / follow-ups
OFFSET-based boundary discovery; material complexity best kept behind its own config flag in a future PR.chunks_per_table=1) is sufficient for the common case; adaptive partitioning is a separate feature.Test plan
internal/impl/mysqlwith-race -shuffle=ongofmt/go vetcleanliness