Skip to content

[FLINK-39478][mysql] Persist checkpointIdToFinish and expose enumerat or metrics#4410

Open
jubins wants to merge 1 commit into
apache:masterfrom
jubins:j-flink-39478-ghost-tables-after-job-restart
Open

[FLINK-39478][mysql] Persist checkpointIdToFinish and expose enumerat or metrics#4410
jubins wants to merge 1 commit into
apache:masterfrom
jubins:j-flink-39478-ghost-tables-after-job-restart

Conversation

@jubins
Copy link
Copy Markdown
Contributor

@jubins jubins commented May 24, 2026

What is the purpose of the change

Fixes FLINK-39478 — when scan.newly-added-table.enabled=true and a job restarts mid-snapshot of a newly-added table, the affected table can silently stop emitting events for the rest of the job's lifetime. The root cause is that MySqlSnapshotSplitAssigner.checkpointIdToFinish — the field that gates the NEWLY_ADDED_ASSIGNING → NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED transition — is transient (@Nullable, not in SnapshotPendingSplitsState), and on restore must be re-derived over two more checkpoint cycles (one snapshotState() to set it, one notifyCheckpointComplete() to act on it). If a second restart lands inside that recovery window, the field resets and the state machine stalls. The table remains in the pipeline config and the job appears healthy, but produces zero data thereafter. Reported in production on two ~100M-row tables; the non-deterministic "survived first restart, died on second" pattern is consistent with the timing race described above.

This change persists checkpointIdToFinish into the checkpointed state so the state machine can resume in one step after restore, and adds enumerator-side metrics (assignerStatus, aggregate snapshot progress counters) so operators can observe whether the assigner is making progress during long newly-added-table snapshots — the lack of which made the original incident extremely hard to diagnose. The companion fix for the multi-step BinlogSplitUpdateRequest/Ack handshake between states 3→4 (fix direction #2 in the JIRA) is deferred to a follow-up because it touches the enumerator↔reader RPC protocol and has a larger blast radius — this PR addresses what we believe is the most likely root cause of the observed incidents, plus the observability gap.

Brief change log

  • Added @Nullable Long checkpointIdToFinish to SnapshotPendingSplitsState (flink-connector-mysql-cdc), with a new 11-arg constructor. The pre-existing 10-arg constructor is preserved as a delegating overload that defaults the new field to null, so all existing call sites compile unchanged and no test fixture data needs to change.
  • snapshotState() in MySqlSnapshotSplitAssigner now sets checkpointIdToFinish before building the state object, so the value is captured in the same checkpoint instead of the next one — eliminating the 2-checkpoint recovery window after a restart.
  • Bumped PendingSplitsStateSerializer from VERSION = 5 to VERSION = 6. The serialize path writes a trailing boolean present + long value; the deserialize path reads the trailer only when version >= 6. v5 payloads continue to deserialize with checkpointIdToFinish=null, so no savepoint migration is required.
  • New MySqlSourceEnumeratorMetrics (flink-connector-mysql-cdc) exposes the following gauges on the enumerator metric group: assignerStatus (status code), assignerStatusName (status name), numRemainingTables, numRemainingSnapshotSplits, numAssignedSnapshotSplits, numFinishedSnapshotSplits, numAlreadyProcessedTables. Registered from MySqlSourceEnumerator#start, wrapped in a try/catch so metric-registration failures can never break enumerator startup.
  • Added five default int get*Count() accessors to MySqlSplitAssigner (getRemainingSplitsCount, getRemainingTablesCount, getAssignedSplitsCount, getFinishedSplitsCount, getAlreadyProcessedTablesCount). Default returns 0, preserving source-compatibility for any external implementor. Overridden in MySqlSnapshotSplitAssigner and delegated in MySqlHybridSplitAssigner; MySqlBinlogSplitAssigner uses the no-op defaults (its phase has no snapshot work to report).
  • Extended PendingSplitsStateSerializerTest with a new round-trip case carrying a non-null checkpointIdToFinish, plus a v5-payload backward-compat test confirming the field reads back as null on the old code path.

Verifying this change

This change persists one additional field and registers new metric gauges; no existing data path or split-assignment logic is altered. Verification:

  • New parameterized test variant in PendingSplitsStateSerializerTest (getTestSnapshotPendingSplitsStateWithCheckpointIdToFinish) covers snapshot- and hybrid-state round-trips with a non-null checkpointIdToFinish (42L). All 31 tests in the class pass.
  • New testDeserializeV5MissingCheckpointIdToFinish asserts that a payload deserialized under the previous version number reads checkpointIdToFinish=null and does not attempt to read the v6 trailer — exercises the version-gated read branch.
  • Existing equals/hashCode round-trip in PendingSplitsStateSerializerTest now also exercises the new field via state equality (the field is included in both methods).
  • mvn test-compile passes for the whole flink-connector-mysql-cdc module, validating that the new interface defaults don't break any existing implementor (including the test-only assigner subclasses).
  • mvn spotless:check passes on the changed module.

Note: MySqlSnapshotSplitAssignerTest and MySqlHybridSplitAssignerTest were not runnable in the local environment because they spin up MySQL via testcontainers (Docker not available). They should pass unchanged in CI — none of their fixture data construction needed updating thanks to the preserved 10-arg constructor overload.

A deterministic IT case that injects a restart between snapshotState() setting checkpointIdToFinish and notifyCheckpointComplete() acting on it — i.e., a regression test that fails on master and passes on this branch — is the natural next step but is outside the scope of this PR. Happy to add it as a follow-up if reviewers prefer; it requires some thought about how to deterministically schedule the restart at the vulnerable window without flaking.

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): the MySqlSplitAssigner interface is @Internal, not @PublicEvolving — the new default methods are nonetheless source- and binary-compatible for any external implementor (defaults return 0). SnapshotPendingSplitsState is also internal to the connector. No @PublicEvolving surface is touched.
  • The serializers: yes — PendingSplitsStateSerializer is bumped from version 5 to version 6. The new payload appends a boolean + (optional) long. The deserializer accepts versions 1–6; v5 payloads deserialize unchanged (with checkpointIdToFinish=null), so existing savepoints/checkpoints restore without migration.
  • The runtime per-record code paths (performance sensitive): no — checkpointIdToFinish is read/written only at checkpoint and restore. Metric gauges are lazily evaluated by Flink's reporter; their suppliers are O(1) (size lookups + a field read).
  • Anything that affects deployment or recovery (JobManager, Checkpointing, Kubernetes/Yarn, ZooKeeper): yes — the checkpointed enumerator state schema changes (additive, backward-compatible). Existing checkpoints/savepoints restore cleanly via the v5 read path.
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? It introduces new enumerator metrics (assignerStatus, plus aggregate snapshot progress gauges) on the MySQL CDC source. The state-persistence change is a bug fix rather than a feature.
  • If yes, how is the feature documented? Inline Javadoc on MySqlSourceEnumeratorMetrics and on each metric-name constant documents the contract. The metric names follow the same convention as the existing SourceEnumeratorMetrics in flink-cdc-base. No standalone docs page change is required for this PR; if maintainers prefer, the MySQL connector's metrics table in the docs can be extended in a follow-up.

Was generative AI tooling used to co-author this PR?

  • Yes — Claude was used as a pair-programming assistant for discussing the approach, locating the fragile state-machine windows in the code, and drafting the implementation. All code was reviewed, understood, and verified by the author.

Generated-by: Claude Opus 4.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant