Skip to content

Commit 057d268

Browse files
committed
dekaf: Move append_logs_to_writer outside of tokio::select! to prevent inadvertently cancelling a pending append.
I realized we were dropping some log messages on the floor, especially messages that were logged soon before a task exited. I ended up realizing that because the call to `append_logs_to_writer` is being used as the future for `tokio::select!`, it will get cancelled if another future resolves before it does. This was happening most noticeably when a Session crashed: the error log event containing the error didn't have enough time to fully append before the `mpsc::Receiver<TaskWriterMessage>` closed, which caused `tokio::select!` to cancel the call to `append_logs_to_writer`, dropping the logs contained within it on the ground.
1 parent 832534f commit 057d268

File tree

1 file changed

+11
-10
lines changed

1 file changed

+11
-10
lines changed

crates/dekaf/src/log_appender.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ const WELL_KNOWN_LOG_FIELDS: &'static [&'static str] = &[
278278
SESSION_TASK_NAME_FIELD_MARKER,
279279
SESSION_CLIENT_ID_FIELD_MARKER,
280280
];
281-
pub const LOG_MESSAGE_QUEUE_SIZE: usize = 50;
281+
pub const LOG_MESSAGE_QUEUE_SIZE: usize = 500;
282282

283283
impl<W: TaskWriter + 'static> TaskForwarder<W> {
284284
pub fn new(
@@ -353,21 +353,23 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
353353
let mut pending_logs = Vec::new();
354354

355355
loop {
356-
tokio::select! {
356+
if pending_logs.len() > 0 {
357357
// We always want to start a new append before accumulating more log messages because in
358358
// the extreme case where we're getting messages faster than we can store them, we don't want
359-
// to end up with an infinitely growing buffer of `pending_logs`.
360-
biased;
361-
362-
Err(append_error) = Self::append_logs_to_writer(
359+
// to end up with an infinitely growing buffer of `pending_logs`. We also don't wat `tokio::selet`
360+
// to cancel the append_logs_to_writer call when the next message comes in.
361+
if let Err(append_error) = Self::append_logs_to_writer(
363362
&mut writer,
364363
&mut pending_logs,
365364
shard_ref.clone(),
366365
uuid_producer.clone(),
367-
), if pending_logs.len() > 0 => {
366+
)
367+
.await
368+
{
368369
tracing::error!(?append_error, "Error appending logs");
369370
}
370-
371+
}
372+
tokio::select! {
371373
_ = stats_interval.tick() => {
372374
// Take current stats and write if non-zero
373375
if let Some(current_stats) = stats.take(){
@@ -406,8 +408,7 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
406408
Some(TaskWriterMessage::Stats((collection_name, new_stats))) => {
407409
stats.add(collection_name, new_stats);
408410
}
409-
Some(TaskWriterMessage::Shutdown) => break,
410-
None => break,
411+
Some(TaskWriterMessage::Shutdown) | None => break,
411412
}
412413
},
413414
}

0 commit comments

Comments
 (0)