-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-17871: avoid blocking the herder thread when producer flushing hangs #18142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-17871: avoid blocking the herder thread when producer flushing hangs #18142
Conversation
Hi @davide-armand |
1563874
to
fd1bddc
Compare
The CI is now green. |
A label of 'needs-attention' was automatically added to this PR in order to raise the |
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
A label of 'needs-attention' was automatically added to this PR in order to raise the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @davide-armand Thanks for the fix, and thanks for the test!
Are you able to remove the Thread.sleep calls from the test? These could introduce some flakiness later if run on slow machines. Additionally I think this test leaks threads through the executors. Could you ensure that the executors are closed, and the threads terminate?
…hangs The call to `backingStore.get()` (called by connector task threads through `OffsetStorageReaderImpl.offsets()`) can block for long time waiting for data flush to complete (`KafkaProducer.flush()`). This change moves that call outside the synchronized clause that holds `offsetReadFutures`, so that if `backingStore.get()` hangs then it does not keep `offsetReadFutures` locked. The access to `closed` flag (`closed.get()`) is kept inside the synchronize clause to avoid race condition with `close()`. This is important because `OffsetStorageReaderImpl.close()` needs to lock `offsetReadFutures` as well in order to cancel the futures. Since the herder thread calls `OffsetStorageReaderImpl.close()` when attempting to stops a task, before this change this was resulting in the herder thread hanging indefinetely waiting for `backingStore.get()` to complete.
6d22bc4
to
07f465e
Compare
07f465e
to
b90bf7a
Compare
@gharris1727 Pushed a fix in a separate commit, will squash before merging. I cannot get a green build, if I'm not mistaken it fails on unrelated tests. |
The call to
backingStore.get()
(called by connector task threads throughOffsetStorageReaderImpl.offsets()
) can block for long time waiting for data flush to complete (KafkaProducer.flush()
).This change moves that call outside the synchronized clause that holds
offsetReadFutures
, so that ifbackingStore.get()
hangs then it does not keepoffsetReadFutures
locked. The access toclosed
flag (closed.get()
) is kept inside the synchronize clause to avoid race condition withclose()
.This is important because
OffsetStorageReaderImpl.close()
needs to lockoffsetReadFutures
as well in order to cancel the futures.Since the herder thread calls
OffsetStorageReaderImpl.close()
when attempting to stops a task, before this change this was resulting in the herder thread hanging indefinetely waiting forbackingStore.get()
to complete.Committer Checklist (excluded from commit message)