Skip to content

Commit 13c38d5

Browse files
committed
dekaf: Handle Suspended journals in is_fetch_data_preview
Even though the existing behavior is doing the right thing, this is currently spamming a bunch of errors in the logs so let's clean it up and handle suspended journals here as well.
1 parent 346de28 commit 13c38d5

File tree

1 file changed

+40
-23
lines changed

1 file changed

+40
-23
lines changed

Diff for: crates/dekaf/src/session.rs

+40-23
Original file line numberDiff line numberDiff line change
@@ -1346,35 +1346,52 @@ impl Session {
13461346
.await?
13471347
.ok_or(anyhow::anyhow!("Collection {} not found", collection_name))?;
13481348

1349-
if let Some(
1350-
partition_offset @ PartitionOffset {
1351-
offset: latest_offset,
1352-
..
1353-
},
1354-
) = collection
1349+
match collection
13551350
.fetch_partition_offset(partition as usize, -1)
1356-
.await?
1351+
.await
13571352
{
1358-
// If fetch_offset is >= latest_offset, this is a caught-up consumer
1359-
// polling for new documents, not a data preview request.
1360-
if fetch_offset < latest_offset && latest_offset - fetch_offset < 13 {
1361-
tracing::info!(
1362-
latest_offset,
1363-
diff = latest_offset - fetch_offset,
1364-
"Marking session as data-preview"
1365-
);
1366-
Ok(Some(partition_offset))
1367-
} else {
1353+
Ok(Some(
1354+
partition_offset @ PartitionOffset {
1355+
offset: latest_offset,
1356+
..
1357+
},
1358+
)) => {
1359+
// If fetch_offset is >= latest_offset, this is a caught-up consumer
1360+
// polling for new documents, not a data preview request.
1361+
if fetch_offset < latest_offset && latest_offset - fetch_offset < 13 {
1362+
tracing::info!(
1363+
latest_offset,
1364+
diff = latest_offset - fetch_offset,
1365+
"Marking session as data-preview"
1366+
);
1367+
Ok(Some(partition_offset))
1368+
} else {
1369+
tracing::debug!(
1370+
fetch_offset,
1371+
latest_offset,
1372+
diff = latest_offset - fetch_offset,
1373+
"Marking session as non-data-preview"
1374+
);
1375+
Ok(None)
1376+
}
1377+
}
1378+
Ok(_) => Ok(None),
1379+
// Handle Suspended errors as not data preiew
1380+
Err(e)
1381+
if e.downcast_ref::<gazette::Error>().map_or(false, |err| {
1382+
matches!(
1383+
err,
1384+
gazette::Error::BrokerStatus(gazette::broker::Status::Suspended { .. })
1385+
)
1386+
}) =>
1387+
{
13681388
tracing::debug!(
1369-
fetch_offset,
1370-
latest_offset,
1371-
diff = latest_offset - fetch_offset,
1372-
"Marking session as non-data-preview"
1389+
"Partition is suspended, treating as non-data-preview: {:?}",
1390+
e
13731391
);
13741392
Ok(None)
13751393
}
1376-
} else {
1377-
Ok(None)
1394+
Err(e) => return Err(e),
13781395
}
13791396
}
13801397
}

0 commit comments

Comments
 (0)