Skip to content

[CELEBORN-2312] Support committing uncommitted partitions for graceful shutdown#3668

Open
SteNicholas wants to merge 1 commit intoapache:mainfrom
SteNicholas:CELEBORN-2312
Open

[CELEBORN-2312] Support committing uncommitted partitions for graceful shutdown#3668
SteNicholas wants to merge 1 commit intoapache:mainfrom
SteNicholas:CELEBORN-2312

Conversation

@SteNicholas
Copy link
Copy Markdown
Member

@SteNicholas SteNicholas commented Apr 20, 2026

What changes were proposed in this pull request?

Support the worker to proactively commit uncommitted partitions during graceful shutdown, controlled by a new configuration celeborn.worker.graceful.shutdown.commitUncommittedPartitions.enabled(default false).

Key changes:

  • WorkerPartitionLocationInfo#snapshotUncommittedUniqueIds: Takes a weakly-consistent, point-in-time snapshot of uncommitted partition unique IDs grouped by shuffle key (primary + replica). Uses ConcurrentHashMap iteration semantics - concurrent mutations after the snapshot are not visible.
  • Controller#commitUncommittedPartitions(): Snapshots all uncommitted partitions, commits them in parallel via the existing commitFiles thread pool, waits with shuffleCommitTimeout, then removes successfully committed partitions and releases slots. Failed partitions are intentionally retained so the existing passive LifecycleManager CommitFiles retry path can still handle them.
  • Worker#shutdownGracefully(): Invokes Controller#commitUncommittedPartitions() after shutdown.set(true) when the config is enabled.
  • CelebornConf: New config celeborn.worker.graceful.shutdown.commitUncommittedPartitions.enabled (version 0.7.0, default false).

Why are the changes needed?

During graceful shutdown, the worker currently waits passively for LifecycleManager to send CommitFiles RPCs. This introduces unnecessary shutdown latency in scenarios where:

  1. The LifecycleManager is slow to react (e.g., under GC pressure or network delays).
  2. The LifecycleManager has already deregistered the worker and will not send CommitFiles.
  3. Multiple applications have uncommitted partitions, amplifying the wait time.

By allowing the worker to proactively commit its own partitions, the graceful shutdown window can be significantly shortened while maintaining backward compatibility (opt-in, default off).

Does this PR resolve a correctness bug?

No.

Does this PR introduce any user-facing change?

Yes. A new configuration is introduced:

Config Key Default Value
celeborn.worker.graceful.shutdown.commitUncommittedPartitions.enabled false

How was this patch tested?

  • WorkerPartitionLocationInfoSuite
    • snapshotUncommittedUniqueIds - empty info returns empty maps
    • snapshotUncommittedUniqueIds - captures correct IDs across shuffles
    • snapshotUncommittedUniqueIds - filters empty shuffle keys
    • snapshotUncommittedUniqueIds - snapshot is a point-in-time copy
  • WorkerSuite
    • commitUncommittedPartitions - commits primary and replica partitions
    • commitUncommittedPartitions - no-op when no partitions
    • commitUncommittedPartitions - idempotent on double call
    • commitUncommittedPartitions - retains failed partitions for passive wait
    • commitUncommittedPartitions - commits across multiple shuffle keys
    • commitUncommittedPartitions - no cross-shuffle uniqueId collision
    • commitUncommittedPartitions - cross-shuffle collision with partial failure

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in mechanism for Celeborn workers to proactively commit uncommitted partitions during graceful shutdown (to reduce shutdown latency), controlled by a new worker configuration flag.

Changes:

  • Add snapshotUncommittedUniqueIds to snapshot uncommitted partition unique IDs (primary + replica) by shuffle key.
  • Add Controller.commitUncommittedPartitions() and invoke it from Worker.shutdownGracefully() when enabled.
  • Add new config celeborn.worker.graceful.shutdown.commitUncommittedPartitions.enabled and document it; add unit tests for the new behavior.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala Calls proactive commit during graceful shutdown when the new config is enabled.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Implements proactive commit flow using existing commitFiles infrastructure and then removes/releases committed partitions.
common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala Adds snapshot API for uncommitted partition IDs grouped by shuffle key.
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala Introduces the new configuration entry and accessor.
docs/configuration/worker.md Documents the new worker config flag.
common/src/test/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfoSuite.scala Adds tests for the new snapshot behavior.
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala Adds tests for proactive commit behavior and idempotency/failure retention.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Outdated
Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala Outdated
Comment thread worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@SteNicholas SteNicholas marked this pull request as ready for review April 21, 2026 06:23
@FMX
Copy link
Copy Markdown
Contributor

FMX commented Apr 21, 2026

@SteNicholas In this PR, are the committed files stored in the RocksDB by the storage manager?

@SteNicholas
Copy link
Copy Markdown
Member Author

@FMX, both following steps are covered by existing code, and Controller#commitUncommittedPartitions does not need to do any extra store operations.

commitUncommittedPartitions()
    → commitFiles() → fileWriter.close()
      → TierWriter.close() 第120行: notifyFileCommitted()
        → storageManager.notifyFileInfoCommitted()  ← Write to committedFileInfos
    ...                                                                                                                                                                                                                                             
  StorageManager.close(WORKER_GRACEFUL_SHUTDOWN)
    → saveAllCommittedFileInfosToDB()                ← Persist to RocksDB  

@FMX
Copy link
Copy Markdown
Contributor

FMX commented Apr 23, 2026

After some investigation, I think there is something wrong with this PR.

@FMX
Copy link
Copy Markdown
Contributor

FMX commented Apr 23, 2026

Scenario : No restart, but worker proactively commits and clears partitions
This is the more subtle case introduced by the PR.

During graceful shutdown, Controller.commitUncommittedPartitions():

  1. snapshots uncommitted uniqueIds from partitionLocationInfo (best-effort snapshot),
  2. commits them via the existing commitFiles helper,
  3. removes successfully committed (or empty) uniqueIds from partitionLocationInfo and releases slots.
    After this proactive commit, the client-side LifecycleManager/CommitManager might still send CommitFiles RPCs for the same shuffle/uniqueIds (e.g., due to timing, retries, or delayed reactions).
    When Controller.commitFiles() runs for those ids, it calls partitionLocationInfo.getPrimaryLocation/getReplicaLocation. If the location is missing (because the proactive flow already removed it), the worker logs an error and treats the id as failed:
  • location == null → failedIds.add(uniqueId).
    The final CommitFiles response becomes PARTIAL_SUCCESS (or similar), which the client treats as a commit failure for those ids.

@FMX
Copy link
Copy Markdown
Contributor

FMX commented Apr 23, 2026

I think you'll need to extend the shuffleCommitInfos and persist it to make sure subsequent CommitFiles requests can be recognized as already committed.

Copy link
Copy Markdown
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After careful review, I think this PR is not ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants