mysql_cdc: parallel snapshot support#4367
Conversation
Adds an opt-in `snapshot_max_parallel_tables` field to the `mysql_cdc` input. When left at the default (`1`) the snapshot flow is the existing single-transaction, single-goroutine path: bit-for-bit unchanged. When set above `1`, N REPEATABLE READ / CONSISTENT SNAPSHOT transactions are opened on independent connections under a single brief FLUSH TABLES ... WITH READ LOCK window. Every worker observes identical state at the same binlog position, and the configured tables are fanned out across the workers via an errgroup. This preserves the existing global consistent-snapshot invariant and the existing fail-halt failure mode, while removing the per-table serial bottleneck for pipelines with many tables. The inner per-table loop is extracted into readSnapshotTable so both paths share identical semantics. The sequential path is moved into runSequentialSnapshot (unchanged body); the parallel path lives in runParallelSnapshot and parallel_snapshot.go.
Defense-in-depth against a mis-typed config value that would otherwise try to open thousands of MySQL connections at snapshot time. 256 sits well above any realistic pipeline (the existing cap at len(tables) is the more common practical bound) and well below the range where a typo (e.g. 10000) would cause a connection storm before MySQLs own max_connections kicked in. Surfaces as a clear configuration error at Connect time rather than a runtime too-many-connections from the server.
| assert.True(t, | ||
| got < 1 || got > maxSnapshotParallelTables, | ||
| "configured value should violate the [1, %d] range enforced in newMySQLStreamInput", maxSnapshotParallelTables, | ||
| ) |
There was a problem hiding this comment.
This assertion is tautological and does not actually verify the validation logic in newMySQLStreamInput. The expression got < 1 || got > maxSnapshotParallelTables is trivially true for the hardcoded inputs 0, -5, maxSnapshotParallelTables + 1, and 10000 — it tests the input value, not any production-code behavior. If the validation block at input_mysql_stream.go#L301-L306 were removed, this test would still pass.
Per the project test patterns, newMySQLStreamInput can be invoked with service.MockResources() plus license.InjectTestService(resources). The snapshot-range validation runs before the cache HasCache check, so calling the constructor directly and asserting require.ErrorContains(t, err, "must be at least 1") / "must be at most 256" would actually exercise the rejection path.
|
Commits Review
|
No description provided.