Skip to content

[CELEBORN-1855] LifecycleManager return appshuffleId for non barrier stage when fetch fail has been reported#3090

Closed
buska88 wants to merge 8 commits into
apache:mainfrom
buska88:celeborn-1855
Closed

[CELEBORN-1855] LifecycleManager return appshuffleId for non barrier stage when fetch fail has been reported#3090
buska88 wants to merge 8 commits into
apache:mainfrom
buska88:celeborn-1855

Conversation

@buska88

@buska88 buska88 commented Feb 7, 2025

Copy link
Copy Markdown

What changes were proposed in this pull request?

for non barrier shuffle read stage, LifecycleManager#handleGetShuffleIdForApp always return appshuffleId whether fetch status is true or not.

Why are the changes needed?

As described in jira, If LifecycleManager only returns appshuffleId whose fetch status is success, the task will fail directly to "there is no finished map stage associated with", but previous fetch fail event reported may not be fatal.So just give it a chance

Does this PR introduce any user-facing change?

How was this patch tested?

@cxzl25 cxzl25 changed the title [CELEBORN-1855] LifecycleManager return appshuffleId for non barrier … [CELEBORN-1855] LifecycleManager return appshuffleId for non barrier stage when fetch fail has been reported Feb 8, 2025
@FMX

FMX commented Feb 11, 2025

Copy link
Copy Markdown
Contributor

According to your Jira ticket, "that shuffle fetch fails does not lead to stage fail because task speculation and another attempts succeed", I think the quoted scenario should not happen if you have PR #3080 and #2921.

Do you have these two PRs for your Celeborn client?

@buska88

buska88 commented Feb 12, 2025

Copy link
Copy Markdown
Author

According to your Jira ticket, "that shuffle fetch fails does not lead to stage fail because task speculation and another attempts succeed", I think the quoted scenario should not happen if you have PR #3080 and #2921.

Do you have these two PRs for your Celeborn client?

No.We mainly use branch-0.5.Would you please consider that merging these two pr into branch-0.5-rc?Those are important features and many users using branch-0.5 may need them.

@buska88

buska88 commented Feb 12, 2025

Copy link
Copy Markdown
Author

According to your Jira ticket, "that shuffle fetch fails does not lead to stage fail because task speculation and another attempts succeed", I think the quoted scenario should not happen if you have PR #3080 and #2921.
Do you have these two PRs for your Celeborn client?

No.We mainly use branch-0.5.Would you please consider that merging these two pr into branch-0.5-rc?Those are important features and many users using branch-0.5 may need them.

As for this pr, it occurs to me that when a task of a stage throws fetchFail exception, in the small duration between fetchFail exception reported and the stage is aborted, following read tasks cans still get appId and finish shuffle reading.Then when stage retrying, these tasks may not re compute, which save resources.If following tasks fail due to failing to get appId, then they will be recomputed in the next stage-retry inevitablely.
I think this pr has a tiny influence when we have those two prs you mentioned, so it seems an unnecessary pr.

@FMX

FMX commented Feb 13, 2025

Copy link
Copy Markdown
Contributor

According to your Jira ticket, "that shuffle fetch fails does not lead to stage fail because task speculation and another attempts succeed", I think the quoted scenario should not happen if you have PR #3080 and #2921.
Do you have these two PRs for your Celeborn client?

No.We mainly use branch-0.5.Would you please consider that merging these two pr into branch-0.5-rc?Those are important features and many users using branch-0.5 may need them.

This PR can be helpful in branch-0.5 because branch-0.5 doesn't have PR #2921. Drafting a new RC for branch-0.5 can be a good idea.

@gaoyajun02

Copy link
Copy Markdown

According to your Jira ticket, "that shuffle fetch fails does not lead to stage fail because task speculation and another attempts succeed", I think the quoted scenario should not happen if you have PR #3080 and #2921.
Do you have these two PRs for your Celeborn client?

No.We mainly use branch-0.5.Would you please consider that merging these two pr into branch-0.5-rc?Those are important features and many users using branch-0.5 may need them.

This PR can be helpful in branch-0.5 because branch-0.5 doesn't have PR #2921. Drafting a new RC for branch-0.5 can be a good idea.

Does this PR #2921 also need to be backported to the branch-0.5 branch? @FMX

@FMX

FMX commented Feb 17, 2025

Copy link
Copy Markdown
Contributor

According to your Jira ticket, "that shuffle fetch fails does not lead to stage fail because task speculation and another attempts succeed", I think the quoted scenario should not happen if you have PR #3080 and #2921.
Do you have these two PRs for your Celeborn client?

No.We mainly use branch-0.5.Would you please consider that merging these two pr into branch-0.5-rc?Those are important features and many users using branch-0.5 may need them.

This PR can be helpful in branch-0.5 because branch-0.5 doesn't have PR #2921. Drafting a new RC for branch-0.5 can be a good idea.

Does this PR #2921 also need to be backported to the branch-0.5 branch? @FMX

There are too many conflicts between PR#2921 and branch-0.5. So the PR is not backported.

@FMX

FMX commented Feb 17, 2025

Copy link
Copy Markdown
Contributor

@buska88 It would be better to check the validity of a shuffle Id after you get it instead of using a shuffle ID that marked as invalid.

You can add the check here

public int getShuffleId(
int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean isBarrierStage) {
return shuffleIdCache.computeIfAbsent(
appShuffleIdentifier,
(id) -> {
PbGetShuffleId pbGetShuffleId =
PbGetShuffleId.newBuilder()
.setAppShuffleId(appShuffleId)
.setAppShuffleIdentifier(appShuffleIdentifier)
.setIsShuffleWriter(isWriter)
.setIsBarrierStage(isBarrierStage)
.build();
PbGetShuffleIdResponse pbGetShuffleIdResponse =
lifecycleManagerRef.askSync(
pbGetShuffleId,
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
return pbGetShuffleIdResponse.getShuffleId();
});

@buska88

buska88 commented Feb 20, 2025

Copy link
Copy Markdown
Author

@buska88 It would be better to check the validity of a shuffle Id after you get it instead of using a shuffle ID that marked as invalid.

You can add the check here

public int getShuffleId(
int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean isBarrierStage) {
return shuffleIdCache.computeIfAbsent(
appShuffleIdentifier,
(id) -> {
PbGetShuffleId pbGetShuffleId =
PbGetShuffleId.newBuilder()
.setAppShuffleId(appShuffleId)
.setAppShuffleIdentifier(appShuffleIdentifier)
.setIsShuffleWriter(isWriter)
.setIsBarrierStage(isBarrierStage)
.build();
PbGetShuffleIdResponse pbGetShuffleIdResponse =
lifecycleManagerRef.askSync(
pbGetShuffleId,
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
return pbGetShuffleIdResponse.getShuffleId();
});

Sorry, I don't understand the purpose of checking shuffle id here. If the shuffle id is invalid, what should we do next?

@RexXiong

Copy link
Copy Markdown
Contributor

@buska88 It would be better to check the validity of a shuffle Id after you get it instead of using a shuffle ID that marked as invalid.
You can add the check here

public int getShuffleId(
int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean isBarrierStage) {
return shuffleIdCache.computeIfAbsent(
appShuffleIdentifier,
(id) -> {
PbGetShuffleId pbGetShuffleId =
PbGetShuffleId.newBuilder()
.setAppShuffleId(appShuffleId)
.setAppShuffleIdentifier(appShuffleIdentifier)
.setIsShuffleWriter(isWriter)
.setIsBarrierStage(isBarrierStage)
.build();
PbGetShuffleIdResponse pbGetShuffleIdResponse =
lifecycleManagerRef.askSync(
pbGetShuffleId,
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
return pbGetShuffleIdResponse.getShuffleId();
});

Sorry, I don't understand the purpose of checking shuffle id here. If the shuffle id is invalid, what should we do next?

Maybe we can also throw FetchFailureException and wait task retry

@buska88

buska88 commented Mar 4, 2025

Copy link
Copy Markdown
Author

@buska88 It would be better to check the validity of a shuffle Id after you get it instead of using a shuffle ID that marked as invalid.
You can add the check here

public int getShuffleId(
int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean isBarrierStage) {
return shuffleIdCache.computeIfAbsent(
appShuffleIdentifier,
(id) -> {
PbGetShuffleId pbGetShuffleId =
PbGetShuffleId.newBuilder()
.setAppShuffleId(appShuffleId)
.setAppShuffleIdentifier(appShuffleIdentifier)
.setIsShuffleWriter(isWriter)
.setIsBarrierStage(isBarrierStage)
.build();
PbGetShuffleIdResponse pbGetShuffleIdResponse =
lifecycleManagerRef.askSync(
pbGetShuffleId,
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
return pbGetShuffleIdResponse.getShuffleId();
});

Sorry, I don't understand the purpose of checking shuffle id here. If the shuffle id is invalid, what should we do next?

Maybe we can also throw FetchFailureException and wait task retry

OKay.I choose to throw FetchFailException in CelebornShuffleReader(shuffleclientImpl doesn't have spark dependency).And you can have a look at it when you have free time cc @FMX @RexXiong

@github-actions

Copy link
Copy Markdown

This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions Bot added the stale label Mar 25, 2025
@RexXiong RexXiong removed the stale label Mar 27, 2025

@RexXiong RexXiong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very Sorry for the late reply, I missed the message

val shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)
var shuffleId = handle.shuffleId
try {
shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this scenario, Celeborn would throw a CelebornException, so catching a CelebornIOException is incorrect. Moreover, I believe we should not catch the CelebornException itself, as we cannot identify the outcome when encountering it. I propose adding a bool success field to the PbGetShuffleIdResponse to indicate whether the shuffleId is available or not. This way, the client can throw a FetchFailureException if necessary, thereby improving the handling and expression of the situation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing and i fix my code according to your views. cc @RexXiong

@cxzl25 cxzl25 requested a review from RexXiong April 21, 2025 09:00
@buska88

buska88 commented May 6, 2025

Copy link
Copy Markdown
Author
25/04/29 21:42:07 INFO dispatcher-CoarseGrainedScheduler TaskSetManager: Starting task 3928.1 in stage 23.0 (TID 29935, zw06-data-hdp-dn29550.mt, executor 5432, partition 3928, PROCESS_LOCAL, 8328 bytes)
25/04/29 21:42:07 WARN task-result-getter-3 TaskSetManager: Lost task 1434.0 in stage 23.0 (TID 26608, zw06-data-hdp-dn29550.mt, executor 5432): java.lang.OutOfMemoryError: GC overhead limit exceeded

25/04/29 21:42:07 INFO task-result-getter-3 TaskSetManager: Handle failed task, add task to pendingTasks, task 1434.0 in stage 23.0 (TID 26608, zw06-data-hdp-dn29550.mt, executor 5432)
25/04/29 21:42:07 INFO Reporter ApplicationMaster: AppMaster: targetNumExecutors=1400, pendingAllocate=0, runningExecutors=1400. 
25/04/29 21:42:07 INFO dispatcher-BlockManagerMaster BlockManagerMasterEndpoint: Registering block manager zw06-data-hdp-dn27371.mt:26633 with 2004.6 MiB RAM, BlockManagerId(6004, zw06-data-hdp-dn27371.mt, 26633, None)
25/04/29 21:42:07 INFO dispatcher-CoarseGrainedScheduler TaskSetManager: Starting task 1434.1 in stage 23.0 (TID 29936, zw06-data-hdp-dn27371.mt, executor 6004, partition 1434, PROCESS_LOCAL, 8328 bytes)
25/04/29 21:42:07 INFO dispatcher-CoarseGrainedScheduler TaskSetManager: Starting task 3929.1 in stage 23.0 (TID 29937, zw06-data-hdp-dn27371.mt, executor 6004, partition 3929, PROCESS_LOCAL, 8328 bytes)
25/04/29 21:42:07 INFO dispatcher-CoarseGrainedScheduler TaskSetManager: Starting task 3927.1 in stage 23.0 (TID 29938, zw06-data-hdp-dn27371.mt, executor 6004, partition 3927, PROCESS_LOCAL, 8328 bytes)
25/04/29 21:42:08 INFO dispatcher-CoarseGrainedScheduler YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 5432.
25/04/29 21:42:08 INFO Reporter ApplicationMaster: AppMaster: targetNumExecutors=1400, pendingAllocate=0, runningExecutors=1400. 
25/04/29 21:42:08 INFO dag-scheduler-event-loop DAGScheduler: Executor lost: 5432 (epoch 6)
25/04/29 21:42:08 INFO dispatcher-BlockManagerMaster BlockManagerMasterEndpoint: Trying to remove executor 5432 from BlockManagerMaster.
25/04/29 21:42:08 INFO dispatcher-BlockManagerMaster BlockManagerMasterEndpoint: Removing block manager BlockManagerId(5432, zw06-data-hdp-dn29550.mt, 17839, None)
25/04/29 21:42:08 INFO dag-scheduler-event-loop BlockManagerMaster: Removed 5432 successfully in removeExecutor
25/04/29 21:42:08 INFO dispatcher-CoarseGrainedScheduler YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.119.69.11:58434) with ID 5996
25/04/29 21:42:08 INFO spark-listener-group-executorManagement ExecutorMonitor: New executor 5996 has registered (new total is 1367)
25/04/29 21:42:08 INFO Reporter ApplicationMaster: AppMaster: targetNumExecutors=1400, pendingAllocate=0, runningExecutors=1400. 
25/04/29 21:42:08 INFO dispatcher-BlockManagerMaster BlockManagerMasterEndpoint: Registering block manager zw06-data-hdp-dn29769.mt:22001 with 2004.6 MiB RAM, BlockManagerId(5996, zw06-data-hdp-dn29769.mt, 22001, None)
25/04/29 21:42:08 INFO dispatcher-CoarseGrainedScheduler TaskSetManager: Starting task 135.1 in stage 23.0 (TID 29939, zw06-data-hdp-dn29769.mt, executor 5996, partition 135, PROCESS_LOCAL, 8328 bytes)
25/04/29 21:42:08 INFO dispatcher-CoarseGrainedScheduler TaskSetManager: Starting task 136.1 in stage 23.0 (TID 29940, zw06-data-hdp-dn29769.mt, executor 5996, partition 136, PROCESS_LOCAL, 8328 bytes)
25/04/29 21:42:08 INFO dispatcher-CoarseGrainedScheduler TaskSetManager: Starting task 137.1 in stage 23.0 (TID 29941, zw06-data-hdp-dn29769.mt, executor 5996, partition 137, PROCESS_LOCAL, 8328 bytes)
25/04/29 21:42:08 INFO dispatcher-CoarseGrainedScheduler YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 4786.
25/04/29 21:42:08 INFO Reporter ApplicationMaster: AppMaster: targetNumExecutors=1400, pendingAllocate=0, runningExecutors=1400. 
25/04/29 21:42:08 INFO dag-scheduler-event-loop DAGScheduler: Executor lost: 4786 (epoch 6)
25/04/29 21:42:08 INFO dispatcher-BlockManagerMaster BlockManagerMasterEndpoint: Trying to remove executor 4786 from BlockManagerMaster.
25/04/29 21:42:08 INFO dispatcher-BlockManagerMaster BlockManagerMasterEndpoint: Removing block manager BlockManagerId(4786, zw06-data-hdp-dn33950.mt, 35009, None)
25/04/29 21:42:08 INFO dag-scheduler-event-loop BlockManagerMaster: Removed 4786 successfully in removeExecutor
25/04/29 21:42:08 INFO Reporter ApplicationMaster: AppMaster: targetNumExecutors=1400, pendingAllocate=0, runningExecutors=1400. 
25/04/29 21:42:08 INFO dispatcher-CoarseGrainedScheduler YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.119.150.13:51114) with ID 5991
25/04/29 21:42:08 INFO spark-listener-group-executorManagement ExecutorMonitor: New executor 5991 has registered (new total is 1368)
25/04/29 21:42:08 ERROR celeborn-dispatcher-110 SparkUtils: Can not get TaskSetManager for taskId: 29935
25/04/29 21:42:08 INFO celeborn-dispatcher-110 LifecycleManager: handle fetch failure for appShuffleId 5 shuffleId 5

Find a new case for this pr.Task 29935 launch on executor 5432.When executor 5432 lost, SparkUtils cannot get TaskSetManager by taskid, because taskSet.removeRunningTask(tid) has been called.So this pr seems useful for this case.The task can throw a fetchfail exception and re run the stage
cc @RexXiong @cxzl25

@RexXiong RexXiong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

}

public static int celebornShuffleId(
public static Tuple2<Integer, Boolean> celebornShuffleId(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we consider throwing a CelebornRuntimeException when an unavailable celebornShuffleId is encountered? This can allows to catch the exception within the CelebornShuffleReader while maintaining the existing method signature.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i will work on it

@RexXiong RexXiong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

Comment on lines +67 to +69
var shuffleId = handle.shuffleId
try {
shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var shuffleId = handle.shuffleId
try {
shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)
val shuffleId = try {
SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)
} catch {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fix that cc@cxzl25

@RexXiong RexXiong left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@RexXiong RexXiong closed this in 045411a May 13, 2025
@RexXiong

Copy link
Copy Markdown
Contributor

Thanks @buska88 Merge to main(v0.6.0) and also backport to branch-0.5(v0.5.5)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants