From 729899b359d47bbbcf30a84a867a85ff8932d57d Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc <879445+phillipleblanc@users.noreply.github.com> Date: Fri, 22 May 2026 17:10:16 +0900 Subject: [PATCH 1/3] fix(tests): unbreak integration_models search CI (#11008) Two independent fixes: * EmbeddingTable::get_logical_plan now returns None. The forwarding added in #10795 caused LogicalPlanBuilder::scan to inline the inner ViewTable's logical plan (base columns only), then fail when the caller projected the embedding columns that EmbeddingTable adds on top. This made test_multi_column_search_view (and likely the flaky s3_vectors view+embeddings tests) consistently fail. Mirrors the existing IndexedTableProvider pattern, which already returns None for the same reason. * Add the missing snapshot files for test_explicit_dataset_search- ability_validation (introduced in #10968 without them): - explicit_non_searchable_dataset_errors_error_response - explicit_mixed_datasets_fail_fast_error_response - explicit_searchable_dataset_no_matches_returns_empty_response --- crates/runtime/src/embeddings/table.rs | 7 ++++++- ..._explicit_mixed_datasets_fail_fast_error_response.snap | 5 +++++ ...icit_non_searchable_dataset_errors_error_response.snap | 5 +++++ ...rchable_dataset_no_matches_returns_empty_response.snap | 8 ++++++++ 4 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_mixed_datasets_fail_fast_error_response.snap create mode 100644 crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_non_searchable_dataset_errors_error_response.snap create mode 100644 crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_searchable_dataset_no_matches_returns_empty_response.snap diff --git a/crates/runtime/src/embeddings/table.rs b/crates/runtime/src/embeddings/table.rs index c2864d3748..9eaeabfd2a 100644 --- a/crates/runtime/src/embeddings/table.rs +++ b/crates/runtime/src/embeddings/table.rs @@ -823,7 +823,12 @@ impl TableProvider for EmbeddingTable { } fn get_logical_plan(&self) -> Option> { - self.base_table.get_logical_plan() + // EmbeddingTable augments the base table's schema with computed + // embedding columns. The base table's logical plan does not represent + // those columns, so forwarding it would cause `LogicalPlanBuilder::scan` + // to inline a plan whose schema is missing the embedding columns — + // any subsequent projection of those columns then fails. + None } fn schema(&self) -> SchemaRef { diff --git a/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_mixed_datasets_fail_fast_error_response.snap b/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_mixed_datasets_fail_fast_error_response.snap new file mode 100644 index 0000000000..c9865ba130 --- /dev/null +++ b/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_mixed_datasets_fail_fast_error_response.snap @@ -0,0 +1,5 @@ +--- +source: crates/runtime/tests/models/search.rs +expression: err.to_string() +--- +HTTP error: 400 Bad Request - Search cannot be run on plain because it has no embeddings or full text search indexes. diff --git a/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_non_searchable_dataset_errors_error_response.snap b/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_non_searchable_dataset_errors_error_response.snap new file mode 100644 index 0000000000..c9865ba130 --- /dev/null +++ b/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_non_searchable_dataset_errors_error_response.snap @@ -0,0 +1,5 @@ +--- +source: crates/runtime/tests/models/search.rs +expression: err.to_string() +--- +HTTP error: 400 Bad Request - Search cannot be run on plain because it has no embeddings or full text search indexes. diff --git a/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_searchable_dataset_no_matches_returns_empty_response.snap b/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_searchable_dataset_no_matches_returns_empty_response.snap new file mode 100644 index 0000000000..7b1f9efda0 --- /dev/null +++ b/crates/runtime/tests/models/snapshots/integration_models__models__search__explicit_searchable_dataset_no_matches_returns_empty_response.snap @@ -0,0 +1,8 @@ +--- +source: crates/runtime/tests/models/search.rs +expression: "normalize_search_response(resp, round_scores)" +--- +{ + "duration_ms": "duration_ms_val", + "results": [] +} From 335a564bfb04f71e4a861aa7d9c37960a81a3915 Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Fri, 22 May 2026 11:43:59 +0300 Subject: [PATCH 2/3] Fix: `refresh_mode: snapshot` reports Ready with empty data when no snapshot exists (#10979) Co-authored-by: Phillip LeBlanc <879445+phillipleblanc@users.noreply.github.com> --- .../src/accelerated_table/refresh_task.rs | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/crates/runtime/src/accelerated_table/refresh_task.rs b/crates/runtime/src/accelerated_table/refresh_task.rs index e09fcce24a..a17c7a956e 100644 --- a/crates/runtime/src/accelerated_table/refresh_task.rs +++ b/crates/runtime/src/accelerated_table/refresh_task.rs @@ -1062,6 +1062,29 @@ impl RefreshTask { let info = match download_result { Ok(Some(info)) => info, + Ok(None) if current_local_id.is_none() => { + // No snapshot has ever been loaded and none is available at the configured location. + tracing::warn!( + dataset = %self.dataset_name, + snapshot_location = %state.manager.snapshot_location(), + "refresh_mode: snapshot - no snapshot found at the configured location. Ensure the snapshot location is correct and that a snapshot has been created." + ); + self.set_refresh_status( + None, + status::ComponentStatus::error_with_message( + "no snapshot available".to_string(), + ), + ) + .await; + return Err(RetryError::transient( + super::Error::FailedToRefreshDataset { + source: datafusion::error::DataFusionError::Internal( + "refresh_mode: snapshot - no snapshot found at the configured location" + .to_string(), + ), + }, + )); + } Ok(None) => { tracing::debug!( dataset = %self.dataset_name, From 70f76b55db3d2950839c89f561280a82d16fc440 Mon Sep 17 00:00:00 2001 From: Evgenii Khramkov Date: Fri, 22 May 2026 20:52:26 +0900 Subject: [PATCH 3/3] fix(kafka): seek to sidecar offsets via post_rebalance callback on restart (#11007) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix lint * fix: re-insert offsets on seek failure and fix clippy redundant closures * fix(kafka): stash sidecar offsets at construction to close subscribe race The previous fix had callers invoke restore_offsets() after subscribe(), relying on the post_rebalance callback to fire later. Under cold-start or slow-broker conditions the assignment can arrive before the setter writes the stash — the callback sees None and the consumer falls back to auto.offset.reset=smallest, silently replaying from the beginning. Move the offsets into the consumer context at construction so the stash is in place before subscribe() can trigger a rebalance. The standalone restore_offsets() method is replaced by an argument on create_with_existing_group_id. Debezium's consumer construction moves below the schema-evolution check so post-evolution metadata.offsets are the ones threaded through. --------- Co-authored-by: Phillip LeBlanc <879445+phillipleblanc@users.noreply.github.com> --- crates/data_components/src/kafka.rs | 78 ++++++++++++++++---- crates/runtime/src/dataconnector/debezium.rs | 32 ++++---- crates/runtime/src/dataconnector/kafka.rs | 25 +++---- 3 files changed, 87 insertions(+), 48 deletions(-) diff --git a/crates/data_components/src/kafka.rs b/crates/data_components/src/kafka.rs index ebaac22dce..aabe71e000 100644 --- a/crates/data_components/src/kafka.rs +++ b/crates/data_components/src/kafka.rs @@ -28,7 +28,7 @@ use futures::Stream; use rdkafka::{ ClientConfig, Message, Offset, config::RDKafkaLogLevel, - consumer::{CommitMode, Consumer, StreamConsumer}, + consumer::{BaseConsumer, CommitMode, Consumer, Rebalance, StreamConsumer}, message::BorrowedMessage, topic_partition_list::TopicPartitionList, util::get_rdkafka_version, @@ -217,11 +217,19 @@ pub struct KafkaMetrics { struct KafkaConsumerContext { metrics: Arc, + /// Offsets to seek to on the first partition assignment (restored from sidecar). + /// Populated at construction so the stash is in place before `subscribe()` can + /// trigger a rebalance. The first `Rebalance::Assign` takes the value; later + /// rebalances see `None` and fall back to the group-committed offset. + restore_offsets: std::sync::Mutex>, } impl KafkaConsumerContext { - fn new(metrics: Arc) -> Self { - Self { metrics } + fn new(metrics: Arc, restore_offsets: Option) -> Self { + Self { + metrics, + restore_offsets: std::sync::Mutex::new(restore_offsets), + } } } @@ -320,7 +328,35 @@ impl rdkafka::ClientContext for KafkaConsumerContext { } } -impl rdkafka::consumer::ConsumerContext for KafkaConsumerContext {} +impl rdkafka::consumer::ConsumerContext for KafkaConsumerContext { + fn post_rebalance(&self, base_consumer: &BaseConsumer, rebalance: &Rebalance<'_>) { + if let Rebalance::Assign(_) = rebalance { + // On first assignment after subscribe, seek to sidecar offsets if available. + // Take the offsets so this only fires once on success. + let offsets = self + .restore_offsets + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .take(); + + if let Some(tpl) = offsets { + match base_consumer.seek_partitions(tpl.clone(), Duration::from_secs(5)) { + Ok(_) => { + tracing::info!("Restored Kafka consumer offsets from sidecar"); + } + Err(e) => { + tracing::error!("Failed to seek to restored offsets: {e}"); + // Re-insert offsets so the next rebalance retries the seek. + *self + .restore_offsets + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(tpl); + } + } + } + } + } +} pub struct KafkaConsumer { group_id: String, @@ -329,11 +365,16 @@ pub struct KafkaConsumer { } impl KafkaConsumer { + /// Construct a consumer for an existing consumer group, restoring partition + /// offsets from the sidecar before any rebalance can fire. Pass an empty + /// slice for `restore_offsets` when there is nothing to restore. pub fn create_with_existing_group_id( group_id: impl Into, kafka_config: &KafkaConfig, + restore_offsets: &[KafkaOffset], ) -> Result { - Self::create(group_id.into(), kafka_config) + let restore = Self::build_restore_tpl(restore_offsets)?; + Self::create(group_id.into(), kafka_config, restore) } pub fn create_for_dataset( @@ -344,6 +385,7 @@ impl KafkaConsumer { Self::create( group_id.unwrap_or_else(|| Self::generate_group_id(dataset)), kafka_config, + None, ) } @@ -408,9 +450,12 @@ impl KafkaConsumer { .context(UnableToCommitConsumerStateSnafu) } - pub fn restore_offsets(&self, offsets: &[KafkaOffset]) -> Result<()> { + /// Build a [`TopicPartitionList`] from sidecar offsets. Returns `None` when + /// the slice is empty so callers can pass the result straight into + /// [`KafkaConsumer::create`] for the no-restore case. + fn build_restore_tpl(offsets: &[KafkaOffset]) -> Result> { if offsets.is_empty() { - return Ok(()); + return Ok(None); } let mut topic_partition_list = TopicPartitionList::new(); @@ -426,11 +471,7 @@ impl KafkaConsumer { })?; } - self.consumer - .commit(&topic_partition_list, CommitMode::Sync) - .context(UnableToRestoreOffsetsSnafu { - message: "Failed to commit sidecar offsets to Kafka".to_string(), - }) + Ok(Some(topic_partition_list)) } pub fn restart_topic(&self, topic: &str) -> Result<()> { @@ -490,7 +531,11 @@ impl KafkaConsumer { &self.metrics } - fn create(group_id: String, kafka_config: &KafkaConfig) -> Result { + fn create( + group_id: String, + kafka_config: &KafkaConfig, + restore_offsets: Option, + ) -> Result { tracing::debug!("Using kafka group_id: {}", group_id); let (_, version) = get_rdkafka_version(); @@ -548,7 +593,10 @@ impl KafkaConsumer { let consumer: StreamConsumer = config .set_log_level(RDKafkaLogLevel::Debug) - .create_with_context(KafkaConsumerContext::new(Arc::clone(&metrics))) + .create_with_context(KafkaConsumerContext::new( + Arc::clone(&metrics), + restore_offsets, + )) .context(UnableToCreateConsumerSnafu)?; Ok(Self { @@ -571,7 +619,7 @@ impl KafkaConsumer { let temp_group_id = format!("spice-schema-peek-{}", uuid::Uuid::new_v4()); let mut peek_config = kafka_config.clone(); peek_config.metrics_store = None; // Avoid skewing real consumer metrics - let temp_consumer = Self::create(temp_group_id, &peek_config)?; + let temp_consumer = Self::create(temp_group_id, &peek_config, None)?; // Fetch topic metadata to discover partitions let metadata = temp_consumer diff --git a/crates/runtime/src/dataconnector/debezium.rs b/crates/runtime/src/dataconnector/debezium.rs index b20f00b284..5833a90e70 100644 --- a/crates/runtime/src/dataconnector/debezium.rs +++ b/crates/runtime/src/dataconnector/debezium.rs @@ -362,16 +362,6 @@ impl DataConnector for Debezium { ); } - let kafka_consumer = KafkaConsumer::create_with_existing_group_id( - &metadata.consumer_group_id, - &self.kafka_config, - ) - .boxed() - .context(super::UnableToGetReadProviderSnafu { - dataconnector: "debezium", - connector_component: ConnectorComponent::from(dataset), - })?; - ensure!( topic == metadata.topic, super::InvalidConfigurationNoSourceSnafu { @@ -406,6 +396,20 @@ impl DataConnector for Debezium { (metadata, Arc::new(schema)) }; + // Build the consumer with the sidecar offsets already stashed so + // the first rebalance callback after `subscribe` seeks before any + // messages are delivered. + let kafka_consumer = KafkaConsumer::create_with_existing_group_id( + &metadata.consumer_group_id, + &self.kafka_config, + &metadata.offsets, + ) + .boxed() + .context(super::UnableToGetReadProviderSnafu { + dataconnector: "debezium", + connector_component: ConnectorComponent::from(dataset), + })?; + kafka_consumer.subscribe(topic).boxed().context( super::UnableToGetReadProviderSnafu { dataconnector: "debezium", @@ -413,14 +417,6 @@ impl DataConnector for Debezium { }, )?; - kafka_consumer - .restore_offsets(&metadata.offsets) - .boxed() - .context(super::UnableToGetReadProviderSnafu { - dataconnector: "debezium", - connector_component: ConnectorComponent::from(dataset), - })?; - (kafka_consumer, metadata, schema) } None => { diff --git a/crates/runtime/src/dataconnector/kafka.rs b/crates/runtime/src/dataconnector/kafka.rs index ce56eb3076..98c99531a2 100644 --- a/crates/runtime/src/dataconnector/kafka.rs +++ b/crates/runtime/src/dataconnector/kafka.rs @@ -495,13 +495,16 @@ async fn init_kafka_consumer( ); } - let kafka_consumer = - KafkaConsumer::create_with_existing_group_id(&metadata.consumer_group_id, kafka_config) - .boxed() - .context(super::UnableToGetReadProviderSnafu { - dataconnector: "kafka", - connector_component: ConnectorComponent::from(dataset), - })?; + let kafka_consumer = KafkaConsumer::create_with_existing_group_id( + &metadata.consumer_group_id, + kafka_config, + &metadata.offsets, + ) + .boxed() + .context(super::UnableToGetReadProviderSnafu { + dataconnector: "kafka", + connector_component: ConnectorComponent::from(dataset), + })?; kafka_consumer .subscribe(topic) @@ -511,14 +514,6 @@ async fn init_kafka_consumer( connector_component: ConnectorComponent::from(dataset), })?; - kafka_consumer - .restore_offsets(&metadata.offsets) - .boxed() - .context(super::UnableToGetReadProviderSnafu { - dataconnector: "kafka", - connector_component: ConnectorComponent::from(dataset), - })?; - Ok((kafka_consumer, metadata.schema)) }