Skip to content

[CELEBORN-1760][FOLLOWUP] Remove redundant release on data added in flushBuffer#3224

Closed
littlexyw wants to merge 3 commits into
apache:mainfrom
littlexyw:CELEBORN-1760-FOLLOWUP
Closed

[CELEBORN-1760][FOLLOWUP] Remove redundant release on data added in flushBuffer#3224
littlexyw wants to merge 3 commits into
apache:mainfrom
littlexyw:CELEBORN-1760-FOLLOWUP

Conversation

@littlexyw

@littlexyw littlexyw commented Apr 23, 2025

Copy link
Copy Markdown

What changes were proposed in this pull request?

Remove the redundant release of data after OutOfDirectMemoryError appears in flushBuffer.addComponent

Why are the changes needed?

The reason why OutOfDirectMemoryError will appear in flushBuffer.addComponent is that after adding a new component, CompositeByteBuf will determine whether the number of components exceeds the maximum limit. If it exceeds, the existing components will be merged into a large component. At this time, new off-heap memory will be requested. If there is insufficient memory at this time, OutOfDirectMemoryError will be reported, but the new component has been added to flushBuffer at this time. Releasing the new component at this time will cause refcnt error.
Don't worry about the component here not being released causing memory leaks, because it will be released normally in returnBuffer (flush or file destroy or file close).
If writeLocalData does not catch OutOfDirectMemoryError, the impact is as follows:

  1. In the case of a single copy, if [CELEBORN-1818] Fix incorrect timeout exception when waiting on no pending writes #3049 pr is not merged, commitfile will be blocked in waitPendingWrites and fail, because writeLocalData does not correctly decrementPendingWrites. However, this will not cause flushBuffer to exist in memory for a long time, because when shuffle expires, the file will be destroyed, flushBuffer will be returned, and this part of memory will be released.
  2. In the case of dual replicas, in addition to the above problems, the thread of the Eventloop to which replicate-client belongs will be blocked at Await.result(writePromise.future, Duration.Inf) because writePromise is not closed correctly. As a result, this thread will not process other PushData data written by worker-data-replicator to the channels of the Eventloop to which replicate-client belongs. This part of data accumulates in the taskQueue of EventLoop and cannot be canceled, which is the cause of memory leak.
image

Therefore, if the memory leak occurs after OutOfDirectMemoryError occurs in flushBuffer.addComponent, you only need to catch OutOfDirectMemoryError in writeLocalData, and there is no need to release data after addComponent.

I simulated the scenario where addCompoent had an OutOfDirectMemoryError, and released data after the OutOfDirectMemoryError occurred, and a refcnt error occurred.
oom_fix_error_release.log

At the same time, I simulated the scenario where addCompoent had an OutOfDirectMemoryError and did not release data after the OutOfDirectMemoryError occurred. No refcnt error occurred, commitfiles succeeded, the spark task succeeded, and after commitfiles, the worker diskbuffercount became 0.
celeborn_1760_followup_worker.log

Does this PR introduce any user-facing change?

No.

How was this patch tested?

manual test.

@FMX

FMX commented Apr 25, 2025

Copy link
Copy Markdown
Contributor

Ping @leixm.

@FMX

FMX commented Apr 25, 2025

Copy link
Copy Markdown
Contributor

@littlexyw Thanks for this PR. Your description is clear and makes sense. Can you provide some worker metrics for this PR, especially for the scenario in which the worker throws OOM?

@littlexyw

littlexyw commented Apr 25, 2025

Copy link
Copy Markdown
Author

@littlexyw Thanks for this PR. Your description is clear and makes sense. Can you provide some worker metrics for this PR, especially for the scenario in which the worker throws OOM?

企业微信截图_68dd45ce-f8f6-4cff-baae-87e00b126b57 企业微信截图_0abb033c-bb5b-4fd6-afff-0411370ce629 企业微信截图_efd9b01d-a9f8-4dea-813d-b840f9eb625a

@FMX FMX left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. The disk buffer won't be a trouble here because it won't change the memory manager's state.

@turboFei

turboFei commented Apr 28, 2025

Copy link
Copy Markdown
Member

For

override def writeInternal(buf: ByteBuf): Unit = {
buf.retain()
try {
flushBuffer.addComponent(true, buf)
} catch {
case oom: OutOfMemoryError =>
MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())
throw oom
}
// memory tier writer will not flush
// add the bytes into flusher buffer is flush completed
val numBytes = buf.readableBytes()
metaHandler.afterFlush(numBytes)
MemoryManager.instance().incrementMemoryFileStorage(numBytes)

We should not call MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes()) in L323?

MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())

@cfmcgrady cfmcgrady changed the title [CELEBORN-1760][FOLLOWUP] remove redundant release on data added in flushBuffer [CELEBORN-1760][FOLLOWUP] Remove redundant release on data added in flushBuffer May 6, 2025
@littlexyw

Copy link
Copy Markdown
Author

For

override def writeInternal(buf: ByteBuf): Unit = {
buf.retain()
try {
flushBuffer.addComponent(true, buf)
} catch {
case oom: OutOfMemoryError =>
MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())
throw oom
}
// memory tier writer will not flush
// add the bytes into flusher buffer is flush completed
val numBytes = buf.readableBytes()
metaHandler.afterFlush(numBytes)
MemoryManager.instance().incrementMemoryFileStorage(numBytes)

We should not call MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes()) in L323?

MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())

When OOM is thrown, buf.readableBytes should be calculated as if no exception was thrown, because buf has been added to flushBuffer. I submitted some new commits to this end.

@RexXiong RexXiong closed this in fff9725 May 8, 2025
@RexXiong

RexXiong commented May 8, 2025

Copy link
Copy Markdown
Contributor

Merge to main(v0.6.0). Could you also backport this pr to branch-0.5 @littlexyw ?

@littlexyw

Copy link
Copy Markdown
Author

Merge to main(v0.6.0). Could you also backport this pr to branch-0.5 @littlexyw ?

OK

FMX pushed a commit that referenced this pull request May 13, 2025
…lushBuffer

### What changes were proposed in this pull request?
backport #3224  to branch-0.5
Remove the redundant release of data after OutOfDirectMemoryError appears in flushBuffer.addComponent

### Why are the changes needed?
he reason why OutOfDirectMemoryError will appear in flushBuffer.addComponent is that after adding a new component, CompositeByteBuf will determine whether the number of components exceeds the maximum limit. If it exceeds, the existing components will be merged into a large component. At this time, new off-heap memory will be requested. If there is insufficient memory at this time, OutOfDirectMemoryError will be reported, but the new component has been added to flushBuffer at this time. Releasing the new component at this time will cause refcnt error.
Don't worry about the component here not being released causing memory leaks, because it will be released normally in returnBuffer (flush or file destroy or file close).
If writeLocalData does not catch OutOfDirectMemoryError, the impact is as follows:

In the case of a single copy, if #3049 pr is not merged, commitfile will be blocked in waitPendingWrites and fail, because writeLocalData does not correctly decrementPendingWrites. However, this will not cause flushBuffer to exist in memory for a long time, because when shuffle expires, the file will be destroyed, flushBuffer will be returned, and this part of memory will be released.
In the case of dual replicas, in addition to the above problems, the thread of the Eventloop to which replicate-client belongs will be blocked at Await.result(writePromise.future, Duration.Inf) because writePromise is not closed correctly. As a result, this thread will not process other PushData data written by worker-data-replicator to the channels of the Eventloop to which replicate-client belongs. This part of data accumulates in the taskQueue of EventLoop and cannot be canceled, which is the cause of memory leak.
![image](https://github.com/user-attachments/assets/c1a86463-e777-47fb-b709-868bd8390a17)

Therefore, if the memory leak occurs after OutOfDirectMemoryError occurs in flushBuffer.addComponent, you only need to catch OutOfDirectMemoryError in writeLocalData, and there is no need to release data after addComponent.

I simulated the scenario where addCompoent had an OutOfDirectMemoryError, and released data after the OutOfDirectMemoryError occurred, and a refcnt error occurred.
[oom_fix_error_release.log](https://github.com/user-attachments/files/19863484/oom_fix_error_release.log)

At the same time, I simulated the scenario where addCompoent had an OutOfDirectMemoryError and did not release data after the OutOfDirectMemoryError occurred. No refcnt error occurred, commitfiles succeeded, the spark task succeeded, and after commitfiles, the worker diskbuffercount became 0.
[celeborn_1760_followup_worker.log](https://github.com/user-attachments/files/19864486/celeborn_1760_followup_worker.log)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
manual test.

Closes #3249 from littlexyw/CELEBORN-1760-FOLLOWUP-branch-0.5.

Authored-by: xinyuwang1 <xinyuwang1@xiaohongshu.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants