Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 63 additions & 15 deletions crates/data_components/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use futures::Stream;
use rdkafka::{
ClientConfig, Message, Offset,
config::RDKafkaLogLevel,
consumer::{CommitMode, Consumer, StreamConsumer},
consumer::{BaseConsumer, CommitMode, Consumer, Rebalance, StreamConsumer},
message::BorrowedMessage,
topic_partition_list::TopicPartitionList,
util::get_rdkafka_version,
Expand Down Expand Up @@ -217,11 +217,19 @@ pub struct KafkaMetrics {

struct KafkaConsumerContext {
metrics: Arc<KafkaMetrics>,
/// Offsets to seek to on the first partition assignment (restored from sidecar).
/// Populated at construction so the stash is in place before `subscribe()` can
/// trigger a rebalance. The first `Rebalance::Assign` takes the value; later
/// rebalances see `None` and fall back to the group-committed offset.
restore_offsets: std::sync::Mutex<Option<TopicPartitionList>>,
}

impl KafkaConsumerContext {
fn new(metrics: Arc<KafkaMetrics>) -> Self {
Self { metrics }
fn new(metrics: Arc<KafkaMetrics>, restore_offsets: Option<TopicPartitionList>) -> Self {
Self {
metrics,
restore_offsets: std::sync::Mutex::new(restore_offsets),
}
}
}

Expand Down Expand Up @@ -320,7 +328,35 @@ impl rdkafka::ClientContext for KafkaConsumerContext {
}
}

impl rdkafka::consumer::ConsumerContext for KafkaConsumerContext {}
impl rdkafka::consumer::ConsumerContext for KafkaConsumerContext {
fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {
if let Rebalance::Assign(_) = rebalance {
// On first assignment after subscribe, seek to sidecar offsets if available.
// Take the offsets so this only fires once on success.
let offsets = self
.restore_offsets
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take();

if let Some(tpl) = offsets {
match base_consumer.seek_partitions(tpl.clone(), Duration::from_secs(5)) {
Ok(_) => {
tracing::info!("Restored Kafka consumer offsets from sidecar");
}
Err(e) => {
tracing::error!("Failed to seek to restored offsets: {e}");
// Re-insert offsets so the next rebalance retries the seek.
*self
.restore_offsets
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(tpl);
}
}
}
}
}
}

pub struct KafkaConsumer {
group_id: String,
Expand All @@ -329,11 +365,16 @@ pub struct KafkaConsumer {
}

impl KafkaConsumer {
/// Construct a consumer for an existing consumer group, restoring partition
/// offsets from the sidecar before any rebalance can fire. Pass an empty
/// slice for `restore_offsets` when there is nothing to restore.
pub fn create_with_existing_group_id(
group_id: impl Into<String>,
kafka_config: &KafkaConfig,
restore_offsets: &[KafkaOffset],
) -> Result<Self> {
Self::create(group_id.into(), kafka_config)
let restore = Self::build_restore_tpl(restore_offsets)?;
Self::create(group_id.into(), kafka_config, restore)
}

pub fn create_for_dataset(
Expand All @@ -344,6 +385,7 @@ impl KafkaConsumer {
Self::create(
group_id.unwrap_or_else(|| Self::generate_group_id(dataset)),
kafka_config,
None,
)
}

Expand Down Expand Up @@ -408,9 +450,12 @@ impl KafkaConsumer {
.context(UnableToCommitConsumerStateSnafu)
}

pub fn restore_offsets(&self, offsets: &[KafkaOffset]) -> Result<()> {
/// Build a [`TopicPartitionList`] from sidecar offsets. Returns `None` when
/// the slice is empty so callers can pass the result straight into
/// [`KafkaConsumer::create`] for the no-restore case.
fn build_restore_tpl(offsets: &[KafkaOffset]) -> Result<Option<TopicPartitionList>> {
if offsets.is_empty() {
return Ok(());
return Ok(None);
}

let mut topic_partition_list = TopicPartitionList::new();
Expand All @@ -426,11 +471,7 @@ impl KafkaConsumer {
})?;
}

self.consumer
.commit(&topic_partition_list, CommitMode::Sync)
.context(UnableToRestoreOffsetsSnafu {
message: "Failed to commit sidecar offsets to Kafka".to_string(),
})
Ok(Some(topic_partition_list))
}

pub fn restart_topic(&self, topic: &str) -> Result<()> {
Expand Down Expand Up @@ -490,7 +531,11 @@ impl KafkaConsumer {
&self.metrics
}

fn create(group_id: String, kafka_config: &KafkaConfig) -> Result<Self> {
fn create(
group_id: String,
kafka_config: &KafkaConfig,
restore_offsets: Option<TopicPartitionList>,
) -> Result<Self> {
tracing::debug!("Using kafka group_id: {}", group_id);

let (_, version) = get_rdkafka_version();
Expand Down Expand Up @@ -548,7 +593,10 @@ impl KafkaConsumer {

let consumer: StreamConsumer<KafkaConsumerContext> = config
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(KafkaConsumerContext::new(Arc::clone(&metrics)))
.create_with_context(KafkaConsumerContext::new(
Arc::clone(&metrics),
restore_offsets,
))
.context(UnableToCreateConsumerSnafu)?;

Ok(Self {
Expand All @@ -571,7 +619,7 @@ impl KafkaConsumer {
let temp_group_id = format!("spice-schema-peek-{}", uuid::Uuid::new_v4());
let mut peek_config = kafka_config.clone();
peek_config.metrics_store = None; // Avoid skewing real consumer metrics
let temp_consumer = Self::create(temp_group_id, &peek_config)?;
let temp_consumer = Self::create(temp_group_id, &peek_config, None)?;

// Fetch topic metadata to discover partitions
let metadata = temp_consumer
Expand Down
23 changes: 23 additions & 0 deletions crates/runtime/src/accelerated_table/refresh_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,29 @@ impl RefreshTask {

let info = match download_result {
Ok(Some(info)) => info,
Ok(None) if current_local_id.is_none() => {
// No snapshot has ever been loaded and none is available at the configured location.
tracing::warn!(
dataset = %self.dataset_name,
snapshot_location = %state.manager.snapshot_location(),
"refresh_mode: snapshot - no snapshot found at the configured location. Ensure the snapshot location is correct and that a snapshot has been created."
);
self.set_refresh_status(
None,
status::ComponentStatus::error_with_message(
"no snapshot available".to_string(),
),
)
.await;
return Err(RetryError::transient(
super::Error::FailedToRefreshDataset {
source: datafusion::error::DataFusionError::Internal(
"refresh_mode: snapshot - no snapshot found at the configured location"
.to_string(),
),
},
));
}
Ok(None) => {
tracing::debug!(
dataset = %self.dataset_name,
Expand Down
32 changes: 14 additions & 18 deletions crates/runtime/src/dataconnector/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,6 @@ impl DataConnector for Debezium {
);
}

let kafka_consumer = KafkaConsumer::create_with_existing_group_id(
&metadata.consumer_group_id,
&self.kafka_config,
)
.boxed()
.context(super::UnableToGetReadProviderSnafu {
dataconnector: "debezium",
connector_component: ConnectorComponent::from(dataset),
})?;

ensure!(
topic == metadata.topic,
super::InvalidConfigurationNoSourceSnafu {
Expand Down Expand Up @@ -406,21 +396,27 @@ impl DataConnector for Debezium {
(metadata, Arc::new(schema))
};

// Build the consumer with the sidecar offsets already stashed so
// the first rebalance callback after `subscribe` seeks before any
// messages are delivered.
let kafka_consumer = KafkaConsumer::create_with_existing_group_id(
&metadata.consumer_group_id,
&self.kafka_config,
&metadata.offsets,
)
.boxed()
.context(super::UnableToGetReadProviderSnafu {
dataconnector: "debezium",
connector_component: ConnectorComponent::from(dataset),
})?;

kafka_consumer.subscribe(topic).boxed().context(
super::UnableToGetReadProviderSnafu {
dataconnector: "debezium",
connector_component: ConnectorComponent::from(dataset),
},
)?;

kafka_consumer
.restore_offsets(&metadata.offsets)
.boxed()
.context(super::UnableToGetReadProviderSnafu {
dataconnector: "debezium",
connector_component: ConnectorComponent::from(dataset),
})?;

(kafka_consumer, metadata, schema)
}
None => {
Expand Down
25 changes: 10 additions & 15 deletions crates/runtime/src/dataconnector/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,16 @@ async fn init_kafka_consumer(
);
}

let kafka_consumer =
KafkaConsumer::create_with_existing_group_id(&metadata.consumer_group_id, kafka_config)
.boxed()
.context(super::UnableToGetReadProviderSnafu {
dataconnector: "kafka",
connector_component: ConnectorComponent::from(dataset),
})?;
let kafka_consumer = KafkaConsumer::create_with_existing_group_id(
&metadata.consumer_group_id,
kafka_config,
&metadata.offsets,
)
.boxed()
.context(super::UnableToGetReadProviderSnafu {
dataconnector: "kafka",
connector_component: ConnectorComponent::from(dataset),
})?;

kafka_consumer
.subscribe(topic)
Expand All @@ -511,14 +514,6 @@ async fn init_kafka_consumer(
connector_component: ConnectorComponent::from(dataset),
})?;

kafka_consumer
.restore_offsets(&metadata.offsets)
.boxed()
.context(super::UnableToGetReadProviderSnafu {
dataconnector: "kafka",
connector_component: ConnectorComponent::from(dataset),
})?;

Ok((kafka_consumer, metadata.schema))
}

Expand Down
7 changes: 6 additions & 1 deletion crates/runtime/src/embeddings/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,12 @@ impl TableProvider for EmbeddingTable {
}

fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
self.base_table.get_logical_plan()
// EmbeddingTable augments the base table's schema with computed
// embedding columns. The base table's logical plan does not represent
// those columns, so forwarding it would cause `LogicalPlanBuilder::scan`
// to inline a plan whose schema is missing the embedding columns —
// any subsequent projection of those columns then fails.
None
}

fn schema(&self) -> SchemaRef {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: crates/runtime/tests/models/search.rs
expression: err.to_string()
---
HTTP error: 400 Bad Request - Search cannot be run on plain because it has no embeddings or full text search indexes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: crates/runtime/tests/models/search.rs
expression: err.to_string()
---
HTTP error: 400 Bad Request - Search cannot be run on plain because it has no embeddings or full text search indexes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
source: crates/runtime/tests/models/search.rs
expression: "normalize_search_response(resp, round_scores)"
---
{
"duration_ms": "duration_ms_val",
"results": []
}
Loading