KAFKA-20302: Receive buffers allocated from MemoryPool may not be released if request is invalid#21740
KAFKA-20302: Receive buffers allocated from MemoryPool may not be released if request is invalid#21740edoardocomar wants to merge 6 commits intoapache:trunkfrom
Conversation
…eased if request is invalid Return the buffer to the pool if Execptions are caught within processCompletedReceives() taking care of not returning the buffer twice as it can be returned in the RequestChannel.Request constructor.
|
thanks for your comments @akhileshchg |
| // note that even though we got an exception, we can assume that receive.source is valid. | ||
| // Issues with constructing a valid receive object were handled earlier | ||
| case e: Throwable => | ||
| if (header == null || req == null || header.apiKey.requiresDelayedAllocation) { |
There was a problem hiding this comment.
Question:
- Normally, when will the buffer gets released if the header
header.apiKey.requiresDelayedAllocation == true? - In this change, what happen if
header.apiKey.requiresDelayedAllocation == false?
There was a problem hiding this comment.
the buffer gets released if requiresDelayedAllocation is false
after being parsed inside the constructor of RequestChannel.Request
invoked on line 1027 of this PR version of SocketServer.scala
https://github.com/edoardocomar/kafka/blob/KAFKA-20302/core/src/main/scala/kafka/network/SocketServer.scala#L1027-L1028
but it only gets released there if (!header.apiKey.requiresDelayedAllocation)
see comment
//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
There was a problem hiding this comment.
in this PR if header.apiKey.requiresDelayedAllocation == false and in line 1054 we called close inconditionally, nothing wrong happens as NetworkReceive.close() is a no-op if invoked a 2nd time.
But checking for the condition in the catch makes the code more informative, IMHO
There was a problem hiding this comment.
Thanks for the explanation. But could you point me where we will release the buffer when header.apiKey.requiresDelayedAllocation == true? It looks like we will close it when requestHandler completes the handling, is it correct? If so, then what will happen if the buffer is released in L1054 here, but then the request got processed with empty buffer?
There was a problem hiding this comment.
Hi I see if the request has been successfully enqueued,
https://github.com/edoardocomar/kafka/blob/KAFKA-20302/core/src/main/scala/kafka/network/SocketServer.scala#L1040
then the buffer it will be eventually released at the end of KafkaRequesthandler.run
https://github.com/edoardocomar/kafka/blob/KAFKA-20302/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L171
but it will be accessed during the handling and it will be null
so we should not release in SocketServer if successfully enqueued into the requestChannel
There was a problem hiding this comment.
Thanks @showuon I have restricted the release of the buffer in the catch to if (header == null || req == null)
as if successfully enqueued it will be released by the KafkaRequesthandler.
There was a problem hiding this comment.
I checked with a manual test where I was throwing a random simulated IllegalStateException
at the end of handleChannelMuteEvent
and the pool balance was maintained.
I am unable to automate this test though - would need a mock KafkaChannel.handleChannelMuteEvent
i was wondering whether to move channel.channelMetadataRegistry.registerClientInformation
after the enqueuing of the request
but API_VERSIONS's buffer has been deallocated already as it's not a delayed schema (for now)
| // be sure to decrease connection count and drop any in-flight responses | ||
| debug(s"Disconnecting expired channel: $channel : $header") | ||
| close(channel.id) | ||
| receive.close() // return buffer to memory pool |
There was a problem hiding this comment.
Should we check header.apiKey.requiresDelayedAllocation here?
There was a problem hiding this comment.
no there we should always close, as no RequestChannel.Request has been created there and no release of the buffer has occurred
| // } | ||
| // } | ||
| // return -1; | ||
| // } |
There was a problem hiding this comment.
I think we can keep the reason why we use reflection here. But I'm not sure if we need the example code in the comments here. How about we remove it?
There was a problem hiding this comment.
I agree I wanted some feedback on this
There was a problem hiding this comment.
alternatively we could make the memorypool accessor public with a comment // accessible for testing only ?
There was a problem hiding this comment.
rewritten the test comment
|
thanks @showuon for your comments. I believe I have addressed them. can you please re-review |
If an exception is thrown within
kafka.network.Processor#processCompletedReceives close the receive
(return the buffer to the memory pool) if it has not been returned
already. Buffer may have been returned when successfully creating the
RequestChannel.Request if the api did not require DelayedAllocation