Skip to content

[Fix][Zeta] Guard state cleanup races after node failure#10687

Open
zhangshenghang wants to merge 27 commits intoapache:devfrom
zhangshenghang:fix/zeta-state-cleanup-convergence
Open

[Fix][Zeta] Guard state cleanup races after node failure#10687
zhangshenghang wants to merge 27 commits intoapache:devfrom
zhangshenghang:fix/zeta-state-cleanup-convergence

Conversation

@zhangshenghang
Copy link
Copy Markdown
Member

@zhangshenghang zhangshenghang commented Apr 1, 2026

Purpose of this pull request

This PR fixes an engine-side terminal-state convergence bug after worker node failure.

When a worker goes offline, the engine can start cleaning distributed state from the running job state maps before all asynchronous task/pipeline/job callbacks have finished. In the current code path, PhysicalVertex, SubPlan, and PhysicalPlan can observe missing state entries and throw NullPointerException, which interrupts terminal-state convergence and may leave the job hanging in an intermediate state.

This PR changes the cleanup strategy instead of relying on local fallback state:

  • keep terminal job/pipeline/task state in distributed maps for a short cleanup delay window
  • remove runningJobInfoIMap immediately so terminal jobs are not restored on master switch
  • delay physical removal of distributed state maps until late callbacks have time to drain
  • treat already-cleaned state as a no-op defensive path instead of rebuilding distributed state

It also adds:

  • targeted regression tests for terminal tombstone behavior
  • a delay-based cleanup regression test
  • an engine E2E scenario for the BATCH + no checkpoint + job.retry.times=0 no-restore path

Does this PR introduce any user-facing change?

No user-facing API/config change in normal operation. This improves failure handling so jobs are less likely to hang in an intermediate state after node failure.

How was this patch tested?

Verified locally:

  • ./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-common spotless:check
  • ./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-server spotless:check
  • ./mvnw -nsu -pl seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base spotless:check

Additional notes:

  • Added targeted regression tests: StateTransitionCleanupTest, JobStateCleanupDelayTest
  • Added engine E2E coverage: ClusterFailureNoRestoreIT
  • Full Maven test/compile validation in this checkout is currently blocked by unrelated upstream build issues in other modules (for example seatunnel-engine-server references missing types in the current checkout, and reactor builds are also blocked by seatunnel-config-shade compilation issues), so this PR remains draft.

@zhangshenghang zhangshenghang marked this pull request as ready for review April 1, 2026 09:52
Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the tombstone approach for late callbacks, but I think the delayed cleanup currently loses its cleanup owner across a second master failover.

scheduleRemoveJobStateMaps() removes runningJobInfoIMap immediately and then schedules removeJobStateMaps() only in the local monitorService. If that master dies during the delay window, the scheduled task disappears with it. When the next master restores, there is no runningJobInfoIMap entry left to rediscover this job, and restoreJobFromMasterActiveSwitch() just returns for terminal states.

That leaves the terminal entries in runningJobStateIMap / runningJobStateTimestampsIMap orphaned permanently. I think the delayed-cleanup intent needs to be persisted in distributed state (or another recoverable cleanup record), otherwise this closes the race only as long as the same master survives until the timer fires.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest HEAD locally, and I still see the same failover hole during the delayed-cleanup window.

cleanJob() still calls scheduleRemoveJobStateMaps() (JobMaster.java:778-782), and that method still removes runningJobInfoIMap immediately (JobMaster.java:644-647) before scheduling the delayed cleanup on the local monitorService (JobMaster.java:658-672). But master-switch restore only scans runningJobInfoIMap.entrySet() (CoordinatorService.java:636-642, 665-681).

So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned. The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap, because it only runs for jobs that still have a runningJobInfoIMap entry.

The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null immediately after terminal completion, which effectively codifies the same gap instead of covering the second-master-failover case.

I think the delayed-cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to remain until delayed cleanup actually executes, before this can merge.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I pulled the latest HEAD locally and re-reviewed the delayed-cleanup path.

I still see the same blocking failover hole during the cleanup-delay window. cleanJob() still calls scheduleRemoveJobStateMaps(), and that method still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs only by scanning runningJobInfoIMap. So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned.

The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap because it only runs for jobs that still have a runningJobInfoIMap entry. The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null during the delay window, which codifies the same hole instead of covering the second-master-failover case.

I think the delayed cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to stay until the delayed cleanup actually executes. After that, this will be much closer.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest HEAD locally again and I still see the same blocking failover hole during the cleanup-delay window.

JobMaster.scheduleRemoveJobStateMaps() still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs by scanning runningJobInfoIMap in CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch(). So if the active master dies before the delayed task fires, the next master still has no distributed record left to rediscover this terminal job, and the remaining state maps can still be orphaned.

The new stateCleanupDelayMillis=0 test config and the late-checkpoint guard do not close that gap, because they do not persist the delayed-cleanup intent across a second master failover.

I still think this needs one of these two directions before merge:

  • keep runningJobInfoIMap until the delayed cleanup actually executes, or
  • persist the delayed-cleanup intent in recoverable distributed state.

After that, I am happy to re-review.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest HEAD locally again and re-checked the terminal-cleanup / master-switch path.

The previous failover hole looks closed now: JobMaster.scheduleRemoveJobStateMaps() persists a JobCleanupRecord in IMAP_PENDING_JOB_CLEANUP, CoordinatorService.restoreJobFromMasterActiveSwitch() reschedules terminal cleanup instead of dropping the job blindly, and the REST / overview paths filter delayed-cleanup tombstones so finished jobs are not shown as running. With the new unit / E2E coverage around delayed cleanup and no-restore cluster failure, I do not see the previous blocker in the current revision.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the latest update. I re-reviewed the current head locally as seatunnel-review-10687 at commit 17750abb9, comparing it with upstream/dev.

What This PR Fixes

  • User pain: after terminal job/pipeline state cleanup, late asynchronous callbacks or active-master switch recovery can observe missing runtime state and either report misleading errors or leave stale job metadata behind.
  • Fix approach: the PR delays terminal job-state cleanup, records pending cleanup metadata in Hazelcast, reschedules cleanup after master failover, and prevents a new non-savepoint submission from reusing a job id while the old terminal state is still waiting for cleanup.
  • One-line value: terminal state becomes a short-lived tombstone instead of disappearing immediately, which makes late callbacks and master failover safer.

Core Logic Review

Key changed files and methods:

  • CoordinatorService.java: schedulePendingJobCleanup(...), processPendingJobCleanup(...), restoreAllRunningJobFromMasterNodeSwitch(...), and the submitJob(...) pending-cleanup guard.
  • JobMaster.java: createJobCleanupRecord() and scheduleRemoveJobStateMaps().
  • JobCleanupRecord.java: distributed cleanup metadata for job-level state keys.
  • JobInfoService.java: hides terminal jobs that are retained only as cleanup tombstones.

Important before/after point:

// Before: terminal state could be removed immediately, so late callbacks saw null state.
removeJobStateMaps();
// After: terminal state is retained and removed later by a cleanup record.
pendingJobCleanupIMap.put(jobId, cleanupRecord);
coordinatorService.schedulePendingJobCleanup(jobId, cleanupRecord);

The normal Zeta lifecycle does hit this change:

Job reaches terminal state
  -> PhysicalPlan.addPipelineEndCallback()
      -> JobMaster.initStateFuture() completion handler
          -> JobMaster.cleanJob()
              -> createJobCleanupRecord()
              -> pendingJobCleanupIMap.put(jobId, record)
              -> CoordinatorService.schedulePendingJobCleanup(jobId, record)

Delayed cleanup
  -> CoordinatorService.processPendingJobCleanup(jobId, record)
      -> verifies initializationTimestamp still matches current JobInfo
      -> verifies current job state is terminal
      -> removes runningJobInfoIMap
      -> removes recorded state/timestamp keys

Active-master switch before cleanup fires
  -> CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch()
      -> terminal-state zombie jobs are pre-filtered
      -> restoreJobFromMasterActiveSwitch()
          -> reschedules pending cleanup if a cleanup record exists
          -> otherwise removes stale runningJobInfoIMap

Local static verification:

  • git diff --stat upstream/dev...seatunnel-review-10687: 25 files changed, +1272/-45.
  • git diff --name-status upstream/dev...seatunnel-review-10687: Zeta coordinator/job-master cleanup code, serialization hook, REST visibility, tests, and one E2E are touched.
  • gh pr checks 10687: Build is CANCELLED; label and notification checks are successful.
  • Local build/tests: not run. This review is based on the local branch, full diff, and the job lifecycle / failover call-chain inspection.

Compatibility, Side Effects, Errors, and Logs

Compatibility impact: mostly compatible, with one intentional operational behavior change. Finished jobs are retained in runtime maps for state-cleanup-delay-ms before cleanup, while JobInfoService.shouldShowAsRunningJob() prevents these tombstones from showing as running jobs. No public API, protocol, or serialization format used by clients is removed, but a new internal Hazelcast data type is added via ResourceDataSerializerHook.

Performance and side effects: the default 60s retention adds bounded temporary IMap entries for terminal jobs/pipelines/tasks and checkpoint state keys. Cleanup scheduling uses the existing monitor service and should not add hot-path CPU/network cost. The owner timestamp guard is important and is present, so delayed cleanup should not remove a newly submitted job with the same id.

Error handling and logs: cleanup failures are logged and retried through retained cleanup records. Late state transitions now log and skip when the state entry is already missing or terminal, instead of trying to force an invalid transition.

Findings

I did not find a new source-level blocker in the latest code. The remaining blocker is CI status.

Merge Conclusion

Conclusion: can merge after CI is rerun successfully

  1. Blocking items:
    • CI: Build is currently CANCELLED on the latest head (17750abb9). This must be rerun and pass before merge.
  2. Suggested non-blocking items:
    • None from this latest source review.

Overall assessment: the design is a reasonable long-term fix for the terminal-state cleanup race. It keeps state retention bounded, records enough ownership data to avoid deleting a newer job, and includes targeted unit coverage for cleanup ownership, submit blocking, delayed cleanup, and late state transitions. Once Build is green, I think this PR can move forward.

@DanielLeens
Copy link
Copy Markdown

Hi @zhangshenghang, I re-pulled the latest head locally as seatunnel-review-10687 at 76ab078cd758223b2e3b2dbbb26f186a89c947c3.

The new delta I checked is focused on the master-switch cleanup edge case:

CoordinatorService.restoreJobFromMasterActiveSwitch(jobId, jobInfo)
  -> terminal job found during active-master reconciliation
      -> if cleanup record exists: schedule delayed cleanup
      -> if cleanup record is missing: cleanupTerminalZombieJob(jobId)
          -> remove runningJobInfoIMap
          -> remove runningJobStateIMap
          -> remove runningJobStateTimestampsIMap

This closes the zombie terminal-state hole better than only removing runningJobInfoIMap, because the REST/status maps will not keep stale terminal state after the info entry is gone. The added test covers this missing-cleanup-record branch.

Conclusion: can merge after fixes

I do not see a new static code blocker in the latest delta. Please wait for the currently running Build check to finish green before merge.

@DanielLeens
Copy link
Copy Markdown

Hi @zhangshenghang, I rechecked the current PR head locally as seatunnel-review-10687 at de2b568e836b. I reviewed the full diff against upstream/dev and did not run local Maven/tests in this batch; this is a source-level review.

I rechecked the master-switch cleanup path after the latest update:

CoordinatorService.restoreJobFromMasterActiveSwitch(jobId, jobInfo)
  -> terminal job found during active-master reconciliation
      -> cleanup record exists: schedule delayed cleanup
      -> cleanup record missing: cleanupTerminalZombieJob(jobId)
          -> remove runningJobInfoIMap
          -> remove runningJobStateIMap
          -> remove runningJobStateTimestampsIMap

The latest code closes the stale terminal-state map gap better than only removing runningJobInfoIMap. I do not see a new static blocker in the source-level review. The fetched CI metadata reports Build: CANCELLED, so please rerun it.

Conclusion: can merge after fixes

Blocking item:

  1. Rerun CI and merge only after Build is green.

@DanielLeens
Copy link
Copy Markdown

Hi @zhangshenghang, thanks for working on this tricky Zeta cleanup race. I re-reviewed the latest head locally on seatunnel-review-10687 (84ad22224, base 60f1007ab) and traced the terminal-state cleanup path after node failure.

What This PR Fixes

  • User pain point: after node failure, late async callbacks can interact badly with terminal job/pipeline/task states, and early cleanup can make recovery or diagnosis see missing state.
  • Fix approach: keep terminal-state tombstones for a configurable delay (state-cleanup-delay-ms) and record the exact state/timestamp keys in JobCleanupRecord before delayed cleanup.
  • One-sentence value: this PR moves Zeta cleanup toward a safer delayed-tombstone model.

Core Flow Reviewed

Job reaches terminal state
  -> JobMaster.cleanJob() [JobMaster.java:798-802]
  -> createJobCleanupRecord() collects job/pipeline/task/checkpoint state keys [JobMaster.java:636-677]
  -> pendingJobCleanupIMap.put(jobId, cleanupRecord) [JobMaster.java:680-685]
  -> CoordinatorService.schedulePendingJobCleanup() schedules by state-cleanup-delay-ms [CoordinatorService.java:664-693]

Delayed cleanup execution
  -> processPendingJobCleanup(jobId, record) [CoordinatorService.java:696-731]
      -> read currentJobInfo from runningJobInfoIMap [L708]
      -> if currentJobInfo == null, remove cleanup record and return [L708-711]
      -> if owner matches, remove runningJobInfo [L718]
      -> remove state/timestamp keys [L729-730]
      -> remove cleanup record [L731]

Failure window
  -> runningJobInfoIMap.remove(...) succeeds [L718]
  -> process crashes before removeKeys(...) [L729-730]
  -> next cleanup attempt sees currentJobInfo == null [L708]
  -> it removes the pending cleanup record [L710]
  -> recorded state/timestamp keys are never cleaned

Findings

Issue 1: Cleanup can drop the pending record after runningJobInfo is gone but before recorded state keys are removed

  • Location: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:708
  • Severity: High

processPendingJobCleanup() treats currentJobInfo == null as “nothing left to clean” and removes the pending cleanup record. But this method itself removes runningJobInfoIMap before removing runningJobStateIMap and runningJobStateTimestampsIMap. If the member fails between those steps, the next attempt will delete the cleanup record and skip the recorded keys entirely.

Risk: terminal job/pipeline/task/checkpoint state keys can remain in distributed maps indefinitely. That undermines the durability/idempotency goal of the delayed cleanup record.

Suggested fix: let JobCleanupRecord drive final cleanup. If currentJobInfo == null, still remove record.getStateKeys() and record.getTimestampKeys(), then remove the pending record. The owner check is important when current job info exists; it should not prevent idempotent cleanup of keys already captured in the record after the owner info is gone.

Compatibility

  • Compatible from the user API/config perspective.
  • New config: state-cleanup-delay-ms, default 60000.
  • Behavior change: terminal state is retained longer before cleanup, which is intentional.
  • Internal serialization: adds JobCleanupRecord; no user data format change.

Performance And Side Effects

  • Extra Hazelcast map entries are retained for the configured delay.
  • Cleanup records grow with job/pipeline/task/checkpoint key count.
  • No obvious CPU or lock-contention issue.
  • The remaining risk is resource/state leakage under the failure window above.

Tests And Docs

The new tests cover the happy path and terminal transition protection well. Please add one crash/idempotency-style test where runningJobInfoIMap is already missing but the pending cleanup record still exists; state/timestamp keys should still be removed.

Merge Conclusion

Conclusion: merge after fixes

  1. Blocking items:
  • Issue 1: make pending cleanup idempotent after runningJobInfo has already been removed.
  1. Non-blocking suggestions:
  • Add the targeted test above so this failure window stays closed.

CI Build is currently green (72463408118), and the design direction is good. I would fix the idempotent cleanup window before merging, because it is exactly the kind of node-failure edge case this PR is meant to protect.

@DanielLeens
Copy link
Copy Markdown

What This PR Fixes

  • User pain point: After worker loss or master switch, cleaning terminal state too early can make late callbacks read missing state and leave jobs stuck or dirty state behind.
  • Fix approach: This PR uses delayed cleanup plus persisted JobCleanupRecord plus master-failover rescheduling, and the latest commit makes the delayed cleanup idempotent.
  • One-line summary: The current head closes the most dangerous “owner removed but state keys not cleaned” window.

1. Code Review

1.1 Core Logic Analysis

  • Exact change: The key logic is in CoordinatorService.java:708-733, together with the cleanup-record construction path in JobMaster and the new cleanup tests.
  • Before / After snippets:
if (currentJobInfo == null) {
    removePendingJobCleanupRecord(jobId, record);
    return;
}
if (currentJobInfo == null) {
    cleanupPendingJobStateMaps(record);
    removePendingJobCleanupRecord(jobId, record);
    return;
}
  • Key findings:
  • CoordinatorService.java:708-711 now still cleans the recorded state keys even when runningJobInfoIMap is already empty.
  • CoordinatorService.java:719-723 also covers the remove race: if owner removal fails but the latest owner is now null, it still cleans the recorded state / timestamp keys.
  • That directly addresses the earlier “owner removed first, state-key cleanup lost afterward” failure window.
  • Correctness analysis: The latest commit, Make delayed cleanup idempotent, fixes the exact blocker from the previous review. As long as the cleanup record still exists, later attempts no longer give up just because runningJobInfo is gone, so delayed cleanup finally becomes record-driven and idempotent.
  • Runtime path reviewed:
Job reaches terminal state
  -> JobMaster creates JobCleanupRecord and persists it
  -> CoordinatorService.processPendingJobCleanup() [CoordinatorService.java:700-733]
  -> if owner info still exists, remove owner then clean recorded state keys
  -> if owner info is already gone, still clean recorded state keys [708-711]
  -> if remove(jobId, currentJobInfo) races and latest owner becomes null, still clean keys [719-723]

1.2 Compatibility Impact

  • Fully compatible. This is an internal cleanup-record / delayed-cleanup improvement; there is no user API, default-value, or data-protocol break.

1.3 Performance / Side Effects

  • The design intentionally keeps some state longer during the delay window. I do not see a new hotspot lock or unbounded growth risk here.

1.4 Error Handling and Logs

Issue 1: The current Build check is still running

  • Location: GitHub Build 72697112897
  • Description: From the latest code, the earlier delayed-cleanup idempotency blocker is fixed. The only open gate is that the current Build is still IN_PROGRESS.
  • Risk: If CI ultimately fails, the cleanup fix still cannot be merged safely.
  • Best suggestion: Wait for the current Build to finish green.
  • Severity: Medium

2. Code Quality Assessment

2.1 Code Style

  • The fix is targeted and stays inside the cleanup-record boundary instead of spreading into unrelated modules.

2.2 Test Coverage

  • This branch already carries substantial cleanup unit coverage and an E2E; the new idempotency fix lands right in that tested model.

2.3 Documentation

  • This is an internal engine-semantics fix; no extra user docs are needed.

3. Architecture

3.1 Elegance of the Solution

  • This is the patch that turns delayed cleanup into a truly record-driven design.

3.2 Maintainability

  • Once idempotency is in place, future delayed-cleanup changes become much safer.

3.3 Extensibility

  • The same “clean from the record even after the owner is gone” pattern can help other delayed-cleanup maps too.

3.4 Backward Compatibility

  • Backward compatible; it just makes failure-window behavior safer.

4. Issue Summary

No. Issue Location Severity
1 The current Build check is still running GitHub Build 72697112897 Medium

5. Merge Conclusion

Conclusion: merge after fixes

  1. Blocking items

    • Issue 1: wait for the latest Build to finish green.
  2. Recommended non-blocking items

    • None.
  • Overall assessment: The latest head fixes the code issue I cared about most. CI is now the only remaining gate.
  • Better alternative: The current fix is the right hole to close; I would not switch back to a heavier owner-retention strategy.
  • Local branch info: seatunnel-review-10687 at HEAD 2acddd5b6226f61be7a129b5532174ede4b95d9d, compared with upstream/dev
  • Local source-level analysis: I rechecked the latest delayed-cleanup idempotency path locally at source level. I did not run Maven in this batch.
  • CI status: Notify test workflow and labeler are green; Build is still running.

davidzollo
davidzollo previously approved these changes Apr 24, 2026
Copy link
Copy Markdown
Contributor

@davidzollo davidzollo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 if CI passes.
Great job, huge thanks for your contribution

@DanielLeens
Copy link
Copy Markdown

Hi @zhangshenghang, I rechecked the current head locally from seatunnel-review-10687 after the latest discussion.

What This PR Fixes

  • User pain point: after worker loss or master switch, cleaning terminal state too early can race with late callbacks and leave jobs stuck or state cleanup incomplete.
  • Fix approach: the PR combines delayed cleanup, persisted JobCleanupRecord, and failover-aware rescheduling.
  • One-line summary: I rechecked the latest head and the latest follow-up activity is approval only, not a code rollback; I do not currently see a new blocker.

1. Code Review

1.1 Core Logic Analysis

Local review basis:

  • Branch: seatunnel-review-10687
  • Head: 2acddd5b6226f61be7a129b5532174ede4b95d9d
  • Merge base with upstream/dev: 06d2d28ee3122057e898cecf986f098b09c10118
  • git diff --stat upstream/dev...HEAD: 33 files changed, +1675 / -55
  • git diff --check upstream/dev...HEAD: clean
  • GitHub checks: Build, Notify test workflow, and labeler are green
  • I did not run local Maven/tests in this batch; this is source-level review plus Git/GitHub metadata

Runtime path I rechecked:

worker/master failure
  -> CoordinatorService / JobMaster detect terminal transition
  -> schedule delayed cleanup
  -> persist JobCleanupRecord
  -> late callback / failover recovery still has state to consult
  -> delayed cleanup re-checks and clears terminal state safely

Key findings:

  • This patch still hits the real Zeta terminal-cleanup path.
  • The latest activity after Daniel's previous review is maintainer approval only; I do not see a new code regression on the current head.
  • Current CI is green.

I did not find a blocking issue.

1.2 Compatibility Impact

Verdict: fully compatible.

1.3 Performance / Side Effects

The added delay/recovery logic is not a hot-path amplification, and I do not currently see a new leak or race regression from the latest head.

1.4 Error Handling and Logs

No blocking issue found.

2. Code Quality

2.1 Style

The implementation remains consistent with the engine/server style.

2.2 Tests

The engine/server tests around cleanup delay, follower filtering, and state transition coverage are meaningful for this fix.

2.3 Docs

The Zeta deployment docs were updated in both English and Chinese.

3. Architecture

3.1 Elegance

This is a solid targeted fix.

3.2 Maintainability

The cleanup-record boundary is much easier to reason about than eager terminal deletion.

3.3 Extensibility

It gives a clean base for future terminal-cleanup / failover tightening.

3.4 Historical Compatibility

Fully compatible.

4. Issue Summary

No blocking issue found.

5. Merge Decision

Conclusion: can merge

Blocking items:

  • None

Overall, the current head looks stable to me, and the latest discussion did not introduce a new code risk. I am comfortable with merging this.

@zhangshenghang
Copy link
Copy Markdown
Member Author

@dybyte @corgy-w Please help review it when you have time.

@DanielLeens
Copy link
Copy Markdown

Hi @zhangshenghang, thanks for the update. I pulled the latest head locally again and rechecked the cleanup / restore path in seatunnel-engine.

What this PR solves

  • User pain: after node failure or active-master switch, terminal job metadata could remain only partially cleaned, which makes restore / cleanup races harder to reason about.
  • Fix approach: CoordinatorService now distinguishes between “cleanup with a pending cleanup record” and “immediate terminal zombie cleanup”, and the immediate path now removes runningJobInfoIMap, runningJobStateIMap, and runningJobStateTimestampsIMap together.
  • One-line summary: this is a real engine recovery-path fix, and the current head looks merge-ready from my side.

Runtime path I checked

active master switch
  -> restoreJobFromMasterActiveSwitch(jobId, jobInfo)
      -> read runningJobStateIMap
      -> terminal state
          -> cleanup record exists -> schedulePendingJobCleanup(...)
          -> no cleanup record -> cleanupTerminalZombieJob(...)
              -> remove runningJobInfoIMap
              -> remove runningJobStateIMap
              -> remove runningJobStateTimestampsIMap
      -> return without restoring JobMaster

Merge conclusion

Conclusion: can merge

  1. Blocking items
  • None from my current code review.
  1. Suggested follow-ups
  • Please keep the final merge gated on green GitHub checks. The current Build is green on this head.

I also rechecked the test coverage direction, especially the terminal-zombie behavior during master switch, and I do not see a new blocker in the current head.

if (cleanupRecord != null) {
schedulePendingJobCleanup(jobId, cleanupRecord);
} else {
cleanupTerminalZombieJob(jobId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify the assumption behind cleanupTerminalZombieJob()?

It removes the running-state IMaps directly, while the normal JobMaster.cleanJob() path also clears checkpoint state and stores job history / finished job state before removing job IMap data. For a terminal zombie job without a pending cleanup record, do we know that those normal cleanup steps have already completed before master switch? If not, could this direct IMap cleanup make the job disappear from running state without being fully persisted to history?

public void cleanJob() {
checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus());
jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo());
jobHistoryService.storeFinishedJobState(this);
removeJobIMap();
}

if (runningJobInfo != null) {
return restoreJobDAGInfo(runningJobInfo);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a test case?

Comment on lines +1420 to +1426
JobImmutableInformation jobImmutableInformation =
nodeEngine
.getSerializationService()
.toObject(
nodeEngine
.getSerializationService()
.toObject(jobInfo.getJobImmutableInformation()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the second toObject() intentional? It looks unnecessary to me.

@DanielLeens
Copy link
Copy Markdown

Thanks for the follow-up discussion. I re-checked the active-master restore path locally.

What this PR solves

  • User pain: delayed cleanup and master-switch restore can race with each other after node failure.
  • Fix approach: add pendingJobCleanupIMap plus delayed job-cleanup scheduling so job-level cleanup can survive master handoff.
  • One-line value: the direction is right, but the terminal-job fallback branch still skips the normal checkpoint and history cleanup contract.

Runtime path

master active switch
  -> CoordinatorService.restoreJobFromMasterActiveSwitch() [946-954]
     -> if jobState is end-state
        -> pendingJobCleanupIMap.get(jobId) [947-948]
        -> cleanupRecord != null ? schedulePendingJobCleanup() : cleanupTerminalZombieJob() [949-952]

fallback branch
  -> cleanupTerminalZombieJob(jobId) [984-992]
     -> remove runningJobInfoIMap
     -> remove runningJobStateIMap
     -> remove runningJobStateTimestampsIMap

normal terminal cleanup contract
  -> JobMaster.cleanJob() [798-803]
     -> checkpointManager.clearCheckpointIfNeed(...)
     -> jobHistoryService.storeJobInfo(...)
     -> jobHistoryService.storeFinishedJobState(...)
     -> scheduleRemoveJobStateMaps()

Review findings

Issue 1: The fallback branch for terminal jobs without a cleanup record still skips the normal checkpoint and history cleanup contract

  • Location: CoordinatorService.java:946-953, :984-992; JobMaster.java:798-803
  • Why this matters: after master switch, a terminal job can still land in the fallback branch before the previous master finishes writing the cleanup record.
  • Risk: checkpoint leftovers, missing finished-job history, and incomplete terminal-state persistence.
  • Better fix: replay the essential cleanJob semantics in the fallback terminal-cleanup path instead of deleting only the running-state maps.

Merge conclusion

Conclusion: Merge after fixes

Blocking items:

  • Issue 1 should be fixed before merge.

Non-blocking suggestions:

  • The toObject remark from the discussion looks low-risk compared with the restore-contract problem above, so I would not block merge on that one.

CI status:

  • Build is passing, so the remaining blocker is code semantics, not CI.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zhangshenghang, thanks for the latest follow-up. I pulled the newest head locally again and re-checked the terminal-job fallback path after master switch.

First, the new delta does fix part of the gap I called out earlier:

  • the fallback branch now persists job history when the cleanup record is missing
  • and it now cleans the recorded running-state / timestamp / checkpoint-state map keys instead of only dropping runningJobInfoIMap

So this is definitely moving in the right direction.

What this PR solves:

  • User pain: after node failure and active-master switch, terminal-job cleanup can race with restore and leave behind zombie runtime state.
  • Fix approach: keep delayed cleanup metadata in pendingJobCleanupIMap, reschedule cleanup after master handoff, and add a fallback cleanup path when the cleanup record is missing.
  • One-line summary: the overall direction is correct, but the fallback branch still does not fully replay the normal terminal cleanup contract.

Runtime chain I re-verified:

master active switch
  -> CoordinatorService.restoreJobFromMasterActiveSwitch() [CoordinatorService.java:928-955]
     -> if jobState is end-state
        -> pendingJobCleanupIMap.get(jobId)
        -> cleanupRecord != null
             -> schedulePendingJobCleanup(...)
           else
             -> cleanupTerminalZombieJob(jobId, jobInfo, finalStatus) [985-989]
                -> persistTerminalZombieHistoryIfNecessary(...) [1439-1459]
                -> cleanupPendingJobStateMaps(createTerminalZombieCleanupRecord(...))
                -> runningJobInfoIMap.remove(jobId)

normal terminal cleanup
  -> JobMaster.cleanJob() [JobMaster.java:798-803]
     -> checkpointManager.clearCheckpointIfNeed(jobStatus) [799]
        -> checkpointStorage.deleteCheckpoint(jobId) [CheckpointManager.java:242-247]
        -> checkpointMonitorService.cleanupJob(jobId) [CheckpointManager.java:248-250]
     -> jobHistoryService.storeJobInfo(...) [800]
     -> jobHistoryService.storeFinishedJobState(this) [801]
     -> scheduleRemoveJobStateMaps() [802]

The remaining blocker is:

  1. The terminal-zombie fallback still skips the checkpoint cleanup part of the normal terminal contract.
    • The new cleanupTerminalZombieJob(...) now persists history and removes running-state keys, which is good.
    • But it still does not perform the CheckpointManager.clearCheckpointIfNeed(...) semantics from the normal JobMaster.cleanJob() path.
    • That means checkpointStorage.deleteCheckpoint(jobId) and checkpointMonitorService.cleanupJob(jobId) can still be missed when the new master hits this fallback branch before the old master finishes the normal cleanup sequence.

Why I still consider this blocking:

  • this PR is specifically about making the terminal cleanup / restore path safe under node failure
  • leaving checkpoint artifacts behind means the fallback branch is still only partially equivalent to the normal terminal cleanup path
  • for seatunnel-engine, that is still a recovery-path contract gap

Suggested fix order:

  • Preferred: extract the shared terminal-finalization contract into one reusable helper so both JobMaster.cleanJob() and cleanupTerminalZombieJob(...) execute the same checkpoint/history/state cleanup semantics.
  • Minimal patch: explicitly add the missing checkpoint storage + checkpoint monitor cleanup in the fallback branch, with the same FINISHED / CANCELED and savepoint-end semantics as the normal path.

Test note:

  • the new test coverage is helpful for history + running-state cleanup
  • but there is still no coverage proving that fallback cleanup also clears checkpoint storage / checkpoint monitor state

CI note:

  • from the current visible checks, this is no longer a CI problem from my side; the remaining blocker is code semantics.

Conclusion: merge after fixes

Blocking items:

  • Make the terminal-zombie fallback replay the checkpoint cleanup part of the normal terminal cleanup contract.

Non-blocking suggestion:

  • Add one targeted regression test for fallback checkpoint cleanup so this edge case stays closed.

Overall, this branch is getting very close. The latest follow-up meaningfully improves the fallback path, but I would still close the checkpoint-cleanup gap before merge.

@github-actions github-actions Bot removed the reviewed label Apr 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants