Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 10d1d92

Browse files
[fix] Recycle fetchRequest records when resultFuture already complete (#1618)
### Motivation There is currently a memory leak in the `handleFetchRequest` logic. This PR fixes that leak. Steps to observe a memory leak: 1. Connect consumer. 2. Send fetch request to broker, which creates a callback: https://github.com/streamnative/kop/blob/66efad61c853aa9044fd4b03e7bffc3823a36042/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java#L243 3. Close connection to broker before the callback completes, which triggers the logic to complete all uncompleted futures: https://github.com/streamnative/kop/blob/66efad61c853aa9044fd4b03e7bffc3823a36042/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java#L113-L120 Credit to @eolivelli for identifying that the close logic was leading to a memory leak. Note: this code contains the only reference to `ResponseCallbackWrapper`, which indicates that there are no other leaks with this same pattern currently present in the `master` code base. ### Modifications * Add logic that releases retained `ByteBuffers` in the event that the `resultFuture` is already completed.
1 parent 6653115 commit 10d1d92

File tree

1 file changed

+12
-6
lines changed

1 file changed

+12
-6
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -1648,15 +1648,21 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
16481648
partitions.put(tp, data.toPartitionData());
16491649
});
16501650
partitions.putAll(erroneous);
1651-
resultFuture.complete(new ResponseCallbackWrapper(new FetchResponse<>(
1652-
Errors.NONE,
1653-
partitions,
1654-
THROTTLE_TIME_MS,
1655-
request.metadata().sessionId()), () ->
1656-
resultMap.forEach((__, readRecordsResult) -> {
1651+
boolean triggeredCompletion = resultFuture.complete(new ResponseCallbackWrapper(
1652+
new FetchResponse<>(
1653+
Errors.NONE,
1654+
partitions,
1655+
THROTTLE_TIME_MS,
1656+
request.metadata().sessionId()),
1657+
() -> resultMap.forEach((__, readRecordsResult) -> {
16571658
readRecordsResult.recycle();
16581659
})
16591660
));
1661+
if (!triggeredCompletion) {
1662+
resultMap.forEach((__, readRecordsResult) -> {
1663+
readRecordsResult.recycle();
1664+
});
1665+
}
16601666
context.recycle();
16611667
});
16621668
}

0 commit comments

Comments
 (0)