Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,24 @@ public long generateProducerId() {
}

private void maybeRequestNextBlock() {
if (nextProducerIdBlock.get() != null) {
return;
}
// KAFKA-20114 - Acquire requestInFlight before reading backoffDeadlineMs. The response handler
// updates backoffDeadlineMs before clearing requestInFlight, so a successful CAS
// after that clear observes the updated backoff and avoids a premature retry.
if (!requestInFlight.compareAndSet(false, true)) {
return;
}

var retryTimestamp = backoffDeadlineMs.get();
if (retryTimestamp == NO_RETRY || time.milliseconds() >= retryTimestamp) {
// Send a request only if we reached the retry deadline, or if no deadline was set.
if (nextProducerIdBlock.get() == null &&
requestInFlight.compareAndSet(false, true)) {
sendRequest();
}
var now = time.milliseconds();

if (retryTimestamp != NO_RETRY && now < retryTimestamp) {
requestInFlight.set(false);
return;
}
sendRequest();
}

protected void sendRequest() {
Expand Down Expand Up @@ -146,6 +156,9 @@ public void onTimeout() {
private void handleUnsuccessfulResponse() {
// There is no need to compare and set because only one thread
// handles the AllocateProducerIds response.

// KAFKA-20114 - Update the backoff before clearing requestInFlight. maybeRequestNextBlock
// relies on this ordering when it acquires requestInFlight before reading the deadline.
backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
requestInFlight.set(false);
}
Expand Down