Skip to content

dekaf: Fix log forwarding dropping some messages #2055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 11, 2025

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Apr 8, 2025

Description:

I realized we were dropping some log messages on the floor, especially messages that were logged soon before a task exited. I ended up realizing that because the append futures were being directly provided to tokio::select!, they will get cancelled if another future in the select resolves first. Per convo w/ @jgraettinger, I refactored the append loop to:

  • First check if a new log or stats append can be started. If it can, start it and store it in a MaybeFuture::Future
  • Poll either/both MaybeFutures if they're not Done or Gone
  • Poll for new messages
  • Poll for a tick to send any gathered/rolled up stats since the last tick

Also per convo w/ @psFried, remove log forwarding from the registry. We never ended up setting a task name so no logs got sent, and it's best if we don't allow external data like from HTTP requests to get into the ops logs.


This change is Reviewable

…ll their references in order to prevent inadvertently cancelling a pending append when another message arrives.

I realized we were dropping some log messages on the floor, especially messages that were logged soon before a task exited. I ended up realizing that because the append futures were being directly provided to `tokio::select!`, they will get cancelled if another future in the select resolves first.

This was happening most noticeably when a Session crashed: the error log event containing the error didn't have enough time to fully append before the `mpsc::Receiver<TaskWriterMessage>` closed, which caused `tokio::select!` to cancel the call to `append_logs_to_writer`, dropping the logs contained within it on the ground.
@jshearer jshearer force-pushed the dekaf/fix_dropping_logs branch from f0d71b4 to 3a6c574 Compare April 8, 2025 21:34
We never ended up setting a task name so no logs got sent, and it's best if we don't allow external data like from HTTP requests to get into the ops logs
@jshearer jshearer force-pushed the dekaf/fix_dropping_logs branch from 3a6c574 to 3707c83 Compare April 9, 2025 12:58
@jshearer
Copy link
Contributor Author

jshearer commented Apr 9, 2025

Confirmed log forwarding still works locally and in dekaf-def, and that final Session messages are no longer dropped.

@jshearer jshearer marked this pull request as ready for review April 9, 2025 14:26
@jshearer jshearer requested a review from psFried April 9, 2025 14:26
Copy link
Member

@psFried psFried left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with merging this as is, and I don't see any problems. My general impression is that the start function could still be made simpler and clearer, and that handling logs that were generated before the shard is known doesn't seem worth the complexity. But I'm comfortable merging as is, and either deferring or abandoning further refactoring.

@jshearer jshearer merged commit 7e8a6b9 into master Apr 11, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants