Skip to content

Commit 6557852

Browse files
lukekimclaude
andauthored
fix: enable streaming append for Kafka with Cayenne accelerator (spiceai#10269) (spiceai#10780)
Allow `refresh_mode: append` to use a connector-provided `append_stream` when the accelerator engine is Cayenne. Previously, the Cayenne engine was unconditionally excluded from the append-stream attachment path, so a Kafka dataset with `engine: cayenne` and `refresh_mode: append` failed because no streaming source was registered and Cayenne could not fall through to its primary_key validation (since Kafka messages have neither time_column nor primary_key). The check now attempts to attach the source's `append_stream` first. If the source provides one (Kafka, SpiceAI), it is used for all engines including Cayenne. If it does not, Cayenne still falls back to its primary_key validation; other engines require time_column. Co-authored-by: Claude <noreply@anthropic.com>
1 parent 276f9fc commit 6557852

1 file changed

Lines changed: 7 additions & 7 deletions

File tree

  • crates/runtime/src/datafusion

crates/runtime/src/datafusion/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2398,15 +2398,15 @@ impl DataFusion {
23982398
}
23992399
}
24002400

2401-
// For append mode without time_column, check if source provides append_stream
2402-
// Skip this check for Cayenne which has its own validation (supports primary_key or time_column)
2403-
if refresh_mode == RefreshMode::Append
2404-
&& dataset.time_column.is_none()
2405-
&& acceleration_settings.engine != Engine::Cayenne
2406-
{
2401+
// For append mode without time_column, attach the source's append_stream
2402+
// when available (e.g. Kafka). This enables streaming append into any
2403+
// accelerator engine, including Cayenne. When the source does not
2404+
// provide an append_stream, Cayenne falls back to its own validation
2405+
// (supports primary_key); other engines require time_column.
2406+
if refresh_mode == RefreshMode::Append && dataset.time_column.is_none() {
24072407
if let Some(append_stream) = source.append_stream(source_table_provider) {
24082408
accelerated_table_builder.append_stream(append_stream);
2409-
} else {
2409+
} else if acceleration_settings.engine != Engine::Cayenne {
24102410
return Err(Error::AppendRequiresTimeColumn {
24112411
from: dataset.from.clone(),
24122412
});

0 commit comments

Comments
 (0)