Skip to content

[CELEBORN-1917] Support celeborn.client.push.maxBytesSizeInFlight#3248

Closed
DDDominik wants to merge 11 commits into
apache:mainfrom
DDDominik:CELEBORN-1917
Closed

[CELEBORN-1917] Support celeborn.client.push.maxBytesSizeInFlight#3248
DDDominik wants to merge 11 commits into
apache:mainfrom
DDDominik:CELEBORN-1917

Conversation

@DDDominik

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

add data size limitation to inflight data by introducing a new configuration: celeborn.client.push.maxBytesInFlight.perWorker/total and defaults to celeborn.client.push.buffer.max.size * celeborn.client.push.maxReqsInFlight.perWorker/total.
for backward compatibility, also add a control: celeborn.client.push.maxReqsInFlight.enabled.

Why are the changes needed?

celeborn do supports limiting the number of push inflight requests via celeborn.client.push.maxReqsInFlight.perWorker/total. this is a good constraint to memory usage where most requests do not exceed celeborn.client.push.buffer.max.size. however, in a vectorized shuffle (like blaze and gluten), a request might be greatly larger then the max buffer size, leading to too much inflight data and results OOM.

Does this PR introduce any user-facing change?

Yes, add new config for client

How was this patch tested?

test on local env

@mridulm

mridulm commented May 9, 2025

Copy link
Copy Markdown
Contributor

+CC @venkata91, @rmcyang

@SteNicholas

Copy link
Copy Markdown
Member

@DDDominik, any update?

@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 5590ef0 to 0dffcf6 Compare May 26, 2025 09:57
@turboFei

turboFei commented Jun 2, 2025

Copy link
Copy Markdown
Member

Please update the docs with below command:

    UPDATE=1 build/mvn clean test -pl common -am -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite

@turboFei

turboFei commented Jun 2, 2025

Copy link
Copy Markdown
Member

Have not see this error before, it should be related to this PR.
https://github.com/apache/celeborn/actions/runs/15391457214/job/43306799162?pr=3248

[info] - celeborn spark integration test - resubmit a failed barrier stage across jobs *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 5 (collect at CelebornFetchFailureSuite.scala:379) has failed the maximum allowable number of times: 1. Most recent failure reason:
[info] org.apache.spark.shuffle.FetchFailedException: Celeborn FetchFailure appShuffleId/shuffleId: 0/0
[info] 	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:96)
[info] 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
[info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
[info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
[info] 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
[info] 	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
[info] 	at org.apache.spark.scheduler.Task.run(Task.scala:147)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
[info] 	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
[info] 	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
[info] 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
[info] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] 	at java.base/java.lang.Thread.run(Thread.java:840)
[info] Caused by: org.apache.celeborn.common.exception.CelebornRuntimeException: Get invalid shuffle id -1
[info] 	at org.apache.spark.shuffle.celeborn.SparkUtils.celebornShuffleId(SparkUtils.java:146)
[info] 	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:84)
[info] 	... 14 more

https://github.com/apache/celeborn/actions/runs/15391457221/job/43306794218?pr=3248

- celeborn spark integration test - resubmit a failed barrier stage across jobs *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 5 (collect at CelebornFetchFailureSuite.scala:379) has failed the maximum allowable number of times: 1. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException: Celeborn FetchFailure appShuffleId/shuffleId: 0/0
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:96)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.celeborn.common.exception.CelebornRuntimeException: Get invalid shuffle id -1
	at org.apache.spark.shuffle.celeborn.SparkUtils.celebornShuffleId(SparkUtils.java:146)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:84)
	... 14 more

@FMX FMX left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This RP can be helpful when there is memory pressure on executors.

@github-actions

github-actions Bot commented Jul 3, 2025

Copy link
Copy Markdown

This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions Bot added the stale label Jul 3, 2025
@FMX FMX removed the stale label Jul 10, 2025
@SteNicholas SteNicholas changed the title [CELEBORN-1917] supports celeborn.client.push.maxBytesSizeInFlight [CELEBORN-1917] Support celeborn.client.push.maxBytesSizeInFlight Jul 21, 2025
@SteNicholas

Copy link
Copy Markdown
Member

Ping @FMX, @RexXiong, @mridulm.

@SteNicholas SteNicholas requested a review from FMX July 21, 2025 11:20
@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from e8622e1 to d09b424 Compare July 21, 2025 11:38

@DDDominik DDDominik left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Comment thread client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java Outdated
Comment thread common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java Outdated
Comment thread common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java Outdated
Comment thread common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java Outdated
Comment thread common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java Outdated
Comment thread client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala Outdated
@SteNicholas SteNicholas requested a review from mridulm July 22, 2025 06:18
Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@codecov

codecov Bot commented Jul 22, 2025

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 29.00000% with 71 lines in your changes missing coverage. Please review.

Project coverage is 63.56%. Comparing base (8ae9737) to head (a46773d).
Report is 18 commits behind head on main.

Files with missing lines Patch % Lines
.../celeborn/common/write/InFlightRequestTracker.java 11.27% 56 Missing and 7 partials ⚠️
...cala/org/apache/celeborn/common/CelebornConf.scala 71.43% 8 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3248      +/-   ##
==========================================
- Coverage   63.57%   63.56%   -0.00%     
==========================================
  Files         348      351       +3     
  Lines       21300    21571     +271     
  Branches     1879     1914      +35     
==========================================
+ Hits        13539    13710     +171     
- Misses       6781     6861      +80     
- Partials      980     1000      +20     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@RexXiong RexXiong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, only a nit

Comment thread common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java Outdated

@mridulm mridulm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one query, rest looks good - thanks !

Comment on lines +149 to +151
|| (maxInFlightBytesSizeEnabled
&& totalInflightBytes.sum() <= maxInFlightBytesSizeTotal
&& batchBytesSize.sum() <= maxInFlightBytesSizePerWorker)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My earlier query was, should this be || ?
As in, either total inflight is high, or inflight to a specific worker is high (assuming per worker threshold is lower than total !)

Suggested change
|| (maxInFlightBytesSizeEnabled
&& totalInflightBytes.sum() <= maxInFlightBytesSizeTotal
&& batchBytesSize.sum() <= maxInFlightBytesSizePerWorker)) {
|| (maxInFlightBytesSizeEnabled && (
totalInflightBytes.sum() <= maxInFlightBytesSizeTotal ||
batchBytesSize.sum() <= maxInFlightBytesSizePerWorker))) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the PR got merged before this was addressed.
If this is not a concern, that should be fine - else let us do a follow up.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid this comment being ignored, I will submit a PR first

@FMX FMX left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks. Merged into main(v0.7.0).

@FMX FMX closed this in 0ed590d Jul 22, 2025
val CLIENT_PUSH_MAX_BYTES_SIZE_IN_FLIGHT_PERWORKER: OptionalConfigEntry[Long] =
buildConf("celeborn.client.push.maxBytesSizeInFlight.perWorker")
.categories("client")
.version("0.6.1")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging to 0.6.1 as well

turboFei pushed a commit that referenced this pull request Jul 22, 2025
### What changes were proposed in this pull request?
add data size limitation to inflight data by introducing a new configuration: `celeborn.client.push.maxBytesInFlight.perWorker/total` and defaults to `celeborn.client.push.buffer.max.size * celeborn.client.push.maxReqsInFlight.perWorker/total`.
for backward compatibility, also add a control: `celeborn.client.push.maxReqsInFlight.enabled`.

### Why are the changes needed?
celeborn do supports limiting the number of push inflight requests via `celeborn.client.push.maxReqsInFlight.perWorker/total`. this is a good constraint to memory usage where most requests do not exceed `celeborn.client.push.buffer.max.size`. however, in a vectorized shuffle (like blaze and gluten), a request might be greatly larger then the max buffer size, leading to too much inflight data and results OOM.

### Does this PR introduce _any_ user-facing change?
Yes, add new  config for client

### How was this patch tested?
test on local env

Closes #3248 from DDDominik/CELEBORN-1917.

Lead-authored-by: DDDominik <1015545832@qq.com>
Co-authored-by: SteNicholas <programgeek@163.com>
Co-authored-by: DDDominik <zhuangxian@kuaishou.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
(cherry picked from commit 0ed590d)
Signed-off-by: Wang, Fei <fwang12@ebay.com>
@turboFei

Copy link
Copy Markdown
Member

Merged into branch-0.6(0.6.1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants