Skip to content

Commit ebb41f1

Browse files
lukekimJeadieSpice Snapshot Update Botgithub-actions[bot]ewgenius
authored
Add DuckDB vector engine support (spiceai#10562)
* Add self-hosted Spice connector support * Enhance RefreshTask to handle pre-delete rows for upserts and add tests for endpoint scheme validation * Refactor encode_data_update function to use lifetime annotations for better clarity * Refactor data exchange handling to support streaming snapshots and improve batch encoding * fix: enhance upsert handling with primary key validation and improve error reporting * fix: pass dataset name as a reference in upsert pre-delete rows function * Add DuckDB vector engine support * Address DuckDB vector engine PR feedback * Add DuckDB HNSW search integration coverage * Remove DuckDB ef_search alias * Remove DuckDB ef_construction alias * Address DuckDB vector engine PR feedback * Revert "Add self-hosted Spice connector support" This reverts commit 10af21f. * fix merge * better Error enum * Support views on DDL catalogs (spiceai#10554) * Support views on DDL catalogs * fix ref * fix variable * fix compile * fix: invert table_exists loop condition for view dependency wait The view dependency polling loop had an inverted condition: it retried while table_exists() returned true (table found) and broke when it returned false (table not found). This caused all view tests to fail — the loop would spin until the deadline with the table already present, log 'does not exist, retrying...' throughout, then exit on timeout and report the view as failed. Fix: negate the condition so the loop retries while the table is absent (!table_exists) and exits as soon as the table appears. * fix: table_exists takes &TableReference to satisfy clippy::needless_pass_by_value * bad merge * Update datafusion (spiceai#10422) * update datafusion * fix: Update test snapshots * fix: Update test snapshots * fixes for _score in vector UDTF * remove bas snapshots * fix: Update Search integration test snapshots * search: stabilize vector_search score column for rrf recency plans * revert these snapshots * fix: Update Search integration test snapshots * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * update snapshot * remove * fix build * fix: remove unused col import The col function from datafusion_expr is no longer used in embeddings/udtf.rs after the upstream datafusion update; the lint job runs with -D warnings, so the unused import broke the build. * snapshot * update DF * Fix DF * fixes * improved testing for RRF * snapshots * normalize remove * fix: Update Search integration test snapshots (spiceai#10567) Co-authored-by: Spice Snapshot Update Bot <spiceaibot@spice.ai> * test: update insta snapshots --------- Co-authored-by: Spice Snapshot Update Bot <spiceaibot@spice.ai> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Evgenii Khramkov <evgenii@spice.ai> Co-authored-by: Luke Kim <80174+lukekim@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: claudespice <claude@spice.ai> * Improve full-text search indexing performance (spiceai#10464) * Improve tantivy FTS ingest performance * Improve tests * Rollback on error path * remove index as unnecessary --------- Co-authored-by: Jack Eadie <jack@spice.ai> * add to search integration tests * fix compile * update docs * Address DuckDB vector review feedback * fix boxed * Address DuckDB vector follow-up review feedback * Bound DuckDB vector search default limit * fix: Update Search integration test snapshots * fix finding duckdb index * feat: Enhance DuckDB vector query handling for empty projections and filter pushdown * fix: Update Search integration test snapshots * fixes * fixes * better docs * chore: update datafusion-table-providers to add ignored_index_prefixes Pin to fork branch spiceai-hnsw-index-drift which adds `TableDefinition::add_ignored_index_prefix` so externally-managed HNSW indexes (named `__spice_vss_*`) are excluded from the DuckDB writer's index drift check, preventing spurious refresh failures. * chore: update datafusion-table-providers to upstream merged commit Switch from Jeadie fork back to datafusion-contrib upstream at df7dbc64, which includes the merged ignored_index_prefixes fix. * formatting * clippy * fix clippy: return closure result directly instead of let binding --------- Co-authored-by: jeadie <jack@spice.ai> Co-authored-by: Spice Snapshot Update Bot <spiceaibot@spice.ai> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Evgenii Khramkov <evgenii@spice.ai> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: claudespice <claude@spice.ai> Co-authored-by: Sergei Grebnov <sergei.grebnov@gmail.com> Co-authored-by: Copilot <copilot@github.com> Co-authored-by: William <98815791+peasee@users.noreply.github.com>
1 parent e28b42c commit ebb41f1

564 files changed

Lines changed: 9866 additions & 71 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 5 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/runtime-datafusion-index/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,16 @@ pub trait Index: Debug + Send + Sync + 'static {
4949
Ok(batches)
5050
}
5151

52+
/// Called after data has been written via the [`TableSink`] path (full refresh or append).
53+
///
54+
/// Default is a no-op. Implementations use this to create or verify persistent structures
55+
/// (e.g. a vector HNSW index) after each write. Using `IF NOT EXISTS` semantics makes it
56+
/// safe to call on both overwrite (recreates on new table) and append (no-op if index
57+
/// already exists). Not called for CDC writes — those maintain indexes automatically via
58+
/// `DuckDB` VSS on each insert.
59+
async fn on_write_complete(&self) -> Result<()> {
60+
Ok(())
61+
}
62+
5263
fn as_any(&self) -> &dyn Any;
5364
}

crates/runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ duckdb = [
294294
"data_components/duckdb",
295295
"datafusion-optimizer-rules/duckdb",
296296
"runtime-acceleration/duckdb",
297+
"search/duckdb",
297298
]
298299
dynamodb = [
299300
"dep:aws-sdk-dynamodb",

crates/runtime/src/accelerated_table/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ pub enum Error {
181181
#[snafu(display("Failed to construct data for the accelerated dataset: {source}"))]
182182
FailedToBuildRecordBatch { source: ArrowError },
183183

184+
#[snafu(display("Failed to process upsert batch for dataset {dataset_name}: {reason}"))]
185+
InvalidUpsertPrimaryKeys {
186+
dataset_name: String,
187+
reason: String,
188+
},
189+
184190
#[snafu(display("No primary keys defined for dataset {dataset_name}"))]
185191
NoPrimaryKeysDefined { dataset_name: String },
186192
}
@@ -1083,6 +1089,15 @@ impl AcceleratedTable {
10831089
Arc::clone(&self.accelerator)
10841090
}
10851091

1092+
#[must_use]
1093+
pub(crate) fn get_accelerator_ref(&self) -> &Arc<dyn TableProvider> {
1094+
&self.accelerator
1095+
}
1096+
1097+
pub(crate) fn set_accelerator(&mut self, accelerator: Arc<dyn TableProvider>) {
1098+
self.accelerator = accelerator;
1099+
}
1100+
10861101
/// Add a child accelerator that should receive cached data when this parent stores new cache entries.
10871102
/// This is used for localpod caching synchronization.
10881103
pub async fn add_synchronized_child(&self, child_accelerator: Arc<dyn TableProvider>) {

crates/runtime/src/accelerated_table/sink/table.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use datafusion::{
2222
physical_plan::collect, prelude::SessionContext,
2323
};
2424
use runtime_datafusion::execution_plan::schema_cast::SchemaCastScanExec;
25+
use runtime_datafusion_index::IndexedTableProvider;
2526
use runtime_table_partition::provider::PartitionTableProvider;
2627
use util::RetryError;
2728

@@ -123,6 +124,20 @@ impl TableSink {
123124
// Don't fail the write - data was successfully written, index rebuild is best-effort
124125
}
125126
}
127+
128+
// Call on_write_complete on every Index in an IndexedTableProvider.
129+
// Uses IF NOT EXISTS semantics: creates index after overwrite (new table),
130+
// no-op after append (index already exists). CDC skips this path entirely.
131+
if let Some(indexed) = provider.as_any().downcast_ref::<IndexedTableProvider>() {
132+
for index in indexed.get_all_indexes() {
133+
if let Err(e) = index.on_write_complete().await {
134+
tracing::warn!(
135+
"TableSink: on_write_complete failed for index '{}': {e}. Index may be stale until next refresh.",
136+
index.name()
137+
);
138+
}
139+
}
140+
}
126141
}
127142

128143
tracing::debug!(

crates/runtime/src/embeddings/connector.rs

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use crate::changes::Indexes;
1818
use crate::changes::index_change_envelope;
1919
use crate::component::ComponentInitialization;
2020
use crate::component::dataset::Dataset;
21+
#[cfg(feature = "duckdb")]
22+
use crate::component::dataset::acceleration::Engine;
2123
use crate::component::metrics::MetricsProvider;
2224
use crate::dataconnector::{DataConnector, DataConnectorError, DataConnectorResult};
2325
use crate::embeddings::execution_plan::{
@@ -37,6 +39,8 @@ use runtime_datafusion_index::IndexedTableProvider;
3739
use search::generation::text_search::index::FullTextDatabaseIndex;
3840
use search::index::VectorScanTableProvider;
3941
use spicepod::component::embeddings::ColumnEmbeddingConfig;
42+
#[cfg(feature = "duckdb")]
43+
use spicepod::{semantic::ColumnLevelEmbeddingConfig, vector::VectorStore};
4044
use std::any::Any;
4145
use std::sync::Arc;
4246
use tokio::sync::{Mutex, RwLock};
@@ -93,6 +97,21 @@ impl EmbeddingConnector {
9397
if let Some(vector_engine) = &dataset.vectors
9498
&& vector_engine.enabled
9599
{
100+
#[cfg(feature = "duckdb")]
101+
if vector_engine.engine.as_deref() == Some("duckdb")
102+
&& !dataset.acceleration.as_ref().is_some_and(|acceleration| {
103+
acceleration.engine.to_unpartitioned() == Engine::DuckDB
104+
})
105+
{
106+
return Err(DataConnectorError::InvalidConfigurationSourceOnly {
107+
dataconnector: dataset.source().to_string(),
108+
connector_component: dataset.into(),
109+
source: Box::<dyn std::error::Error + Send + Sync>::from(
110+
"DuckDB vector engine requires DuckDB acceleration. Configure the dataset with `acceleration.engine: duckdb`.",
111+
),
112+
});
113+
}
114+
96115
return wrap_table_as_index(
97116
&dataset.runtime().datafusion().ctx,
98117
&self.embedding_models,
@@ -248,7 +267,30 @@ impl DataConnector for EmbeddingConnector {
248267
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
249268
self.inner_connector
250269
.on_accelerated_table_registration(dataset, accelerated_table)
251-
.await
270+
.await?;
271+
272+
#[cfg(feature = "duckdb")]
273+
if let Some(vector_engine) = duckdb_vector_store_for_accelerated_table(dataset) {
274+
let embedding_columns = duckdb_embedding_columns(dataset);
275+
if embedding_columns.is_empty() {
276+
return Ok(());
277+
}
278+
279+
let accelerator = accelerated_table.get_accelerator();
280+
let indexed_accelerator =
281+
crate::embeddings::index::duckdb::wrap_accelerator_with_duckdb_vector_indexes(
282+
&dataset.name,
283+
embedding_columns,
284+
&vector_engine,
285+
accelerator,
286+
Arc::clone(&self.embedding_models),
287+
Arc::clone(&self.secrets),
288+
)
289+
.await?;
290+
accelerated_table.set_accelerator(indexed_accelerator);
291+
}
292+
293+
Ok(())
252294
}
253295

254296
fn supports_changes_stream(&self) -> bool {
@@ -389,6 +431,67 @@ impl DataConnector for EmbeddingConnector {
389431
}
390432
}
391433

434+
#[cfg(feature = "duckdb")]
435+
fn duckdb_vector_store_for_accelerated_table(dataset: &Dataset) -> Option<VectorStore> {
436+
if let Some(vector_engine) = &dataset.vectors
437+
&& vector_engine.enabled
438+
&& vector_engine.engine.as_deref() == Some("duckdb")
439+
{
440+
return Some(vector_engine.clone());
441+
}
442+
443+
if !dataset.has_embeddings()
444+
|| !dataset
445+
.acceleration
446+
.as_ref()
447+
.is_some_and(|acceleration| acceleration.engine.to_unpartitioned() == Engine::DuckDB)
448+
{
449+
return None;
450+
}
451+
452+
let acceleration = dataset.acceleration.as_ref()?;
453+
crate::embeddings::index::duckdb::vector_store_from_embedding_params(&acceleration.params)
454+
}
455+
456+
#[cfg(feature = "duckdb")]
457+
fn duckdb_embedding_columns(dataset: &Dataset) -> Vec<(String, ColumnLevelEmbeddingConfig)> {
458+
let mut embedding_columns = dataset
459+
.embeddings
460+
.iter()
461+
.map(|embedding| {
462+
(
463+
embedding.column.clone(),
464+
ColumnLevelEmbeddingConfig {
465+
model: embedding.model.clone(),
466+
chunking: embedding.chunking.clone(),
467+
row_ids: embedding.primary_keys.clone(),
468+
vector_size: embedding.vector_size,
469+
aggregation: embedding.aggregation,
470+
max_elements_per_row: embedding.max_elements_per_row,
471+
},
472+
)
473+
})
474+
.collect::<Vec<_>>();
475+
476+
for column in &dataset.columns {
477+
// Must be `last()` to mimic what model `EmbeddingTable`'s HashMap ends up with.
478+
let Some(embedding) = column.embeddings.last() else {
479+
continue;
480+
};
481+
482+
if let Some((_, existing)) = embedding_columns
483+
.iter_mut()
484+
.find(|(column_name, _)| column_name == &column.name)
485+
{
486+
*existing = embedding.clone();
487+
} else {
488+
embedding_columns.push((column.name.clone(), embedding.clone()));
489+
}
490+
}
491+
492+
embedding_columns
493+
}
494+
392495
fn underlying_federated_table_for_indexed_table(
393496
src_table_provider: &Arc<dyn TableProvider>,
394497
) -> Option<Arc<FederatedTable>> {

0 commit comments

Comments
 (0)