Skip to content

[drafr] [fix] [client] Fix IO buffer overflow when resend msg after producer reconnect #21351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2253,6 +2253,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
OpSendMsg pendingRegisteringOp = null;
int messageBytesSizeInCache = 0;
while (msgIterator.hasNext()) {
OpSendMsg op = msgIterator.next();
if (from != null) {
Expand Down Expand Up @@ -2284,12 +2285,18 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
if (stripChecksum) {
stripChecksum(op);
}
// To avoid IO buffer overflow, split to multi-flush.
if (messageBytesSizeInCache + op.cmd.readableBytes() < messageBytesSizeInCache) {
cnx.ctx().flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush is an async operation, so this might not work as expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since write and flush will be executed in the same thread, the actions of both write and flush will keep the order to execute. So it will work as expected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean from a different aspect. I guess that in this case, the purpose of flushing is to ensure that buffers don't overflow. It feels like the solution in this PR won't fully address that since all operations will be queued up in any case unless the logic that calls write is also run in the ctx loop (thread). Other possibility would be to somehow wait for flush completion until it finishes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Netty, there's the Channel.isWritable method and channelWritabilityChanged callback that help in keeping the queued writes bounded (see low/high watermark options). However, IIRC, Pulsar code base doesn't show examples of how write logic could be implemented to take advantage of this type of backpressure solution.
Optimally, the logic would be implemented in a way where more writes are added while the channel is writable and then paused. Adding more writes should resume after the channel becomes writable again.

messageBytesSizeInCache = 0;
}
op.cmd.retain();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName,
cnx.channel(), op.sequenceId);
}
cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
messageBytesSizeInCache += op.cmd.readableBytes();
op.updateSentTimestamp();
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
}
Expand Down
Loading