Skip to content

[fix] abort stream load 2pc by label#659

Open
liujiwen-up wants to merge 1 commit into
apache:masterfrom
liujiwen-up:stream-load-label-abort
Open

[fix] abort stream load 2pc by label#659
liujiwen-up wants to merge 1 commit into
apache:masterfrom
liujiwen-up:stream-load-label-abort

Conversation

@liujiwen-up
Copy link
Copy Markdown
Contributor

Proposed Changes

This PR improves Stream Load 2PC recovery by aborting lingering pre-committed transactions with the exact load label, instead of issuing an empty Stream Load request and parsing the txn id from the label-already-exists response.

Main changes

  • Add LoadState and support querying Doris FE through:
    • GET /api/{db}/get_load_state?label={label}
  • Change DorisStreamLoad#abortPreCommit to:
    • rebuild the expected label
    • query the load state by label
    • abort pending transactions through /_stream_load_2pc with the label header
    • stop when the label state is UNKNOWN
    • skip already ABORTED labels
    • fail fast when the label is already COMMITTED or VISIBLE
  • Make 2PC fallback labels deterministic when the original table-based label is invalid because of long or non-ASCII table names.
  • Rebuild recovered labels using the DorisWriterState table identity and subtask id, instead of the current writer identity.
  • Improve response matching for committed/already-aborted transaction messages.
  • Add focused unit tests for:
    • load state parsing
    • label-based abort path
    • recovered writer identity
    • deterministic fallback labels for long/non-ASCII table names
    • committed/already-aborted response handling

Motivation

The previous recovery abort logic depended on reconstructing a label, issuing an empty Stream Load request, then parsing the txn id from the returned label conflict message.

That is fragile for labels generated from long or non-ASCII table names. In those cases the old fallback label used a random UUID, so the label could not be reliably reconstructed during recovery, and the lingering pre-committed transaction might not be aborted.

Using the exact label with Doris Stream Load 2PC abort avoids this indirect txn-id parsing path.

Compatibility Notes

This change requires Doris server support for:

  • GET /api/{db}/get_load_state?label=...
  • /_stream_load_2pc abort by label

These APIs are available in Doris 2.1.0 and later.

For transactions created by older connector versions where the invalid-label fallback used a random UUID, the original label still cannot be reconstructed if it was not persisted in state. This PR guarantees deterministic fallback label generation for newly created 2PC labels.

Tests

mvn -Pflink1 -pl flink-doris-connector-base \
  -Dtest=TestLabelGenerator,TestDorisWriter,TestDorisStreamLoad,TestRestService,TestResponseUtil test

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants