@@ -42,6 +42,25 @@ pub struct Stage {
4242 /// upstream (DESCRIBE) before assembling the final INSERT ... ON
4343 /// CONFLICT statement.
4444 pub upsert : Option < UpsertSpec > ,
45+ /// For xf.ai.text_search: in DuckDB v1.5.x the fts PRAGMA can't see
46+ /// tables created in the same -c invocation. The planner records
47+ /// the spec; the executor runs two CLI calls (stage then index +
48+ /// query) so the PRAGMA sees committed state. Works unchanged on
49+ /// v1.4 too.
50+ pub text_search : Option < TextSearchSpec > ,
51+ }
52+
53+ #[ derive( Debug , Clone ) ]
54+ pub struct TextSearchSpec {
55+ pub from_view : String ,
56+ pub id_col : String ,
57+ pub text_cols : Vec < String > ,
58+ pub query : String ,
59+ pub top_k : Option < u64 > ,
60+ pub output_col : String ,
61+ /// Sanitized staging table name (so PRAGMA can reference a valid
62+ /// SQL identifier even when the node id has special characters).
63+ pub staging_table : String ,
4564}
4665
4766#[ derive( Debug , Clone ) ]
@@ -341,6 +360,7 @@ fn build_stage(
341360 let mut sink_path: Option < String > = None ;
342361 let mut sink_mode: Option < String > = None ;
343362 let mut upsert: Option < UpsertSpec > = None ;
363+ let mut text_search: Option < TextSearchSpec > = None ;
344364 // ATTACH statements for external-DB nodes (DuckDB/SQLite). Each stage
345365 // runs in its own CLI process, so fixed aliases are collision-free.
346366 let attach = attach_prelude ( component_id, & props) ;
@@ -408,15 +428,14 @@ fn build_stage(
408428 } ) ?;
409429 ( format ! ( "{}{}" , attach, sql) , StageKind :: View , None )
410430 } else if component_id == "xf.ai.text_search" {
411- // Full-Text Search needs a stable named table for
412- // create_fts_index, so we materialize the upstream into a temp
413- // table, build the BM25 index on it, then SELECT through the
414- // index. Multi-statement, so it bypasses the standard view
415- // wrapping and emits its own CREATE TABLE for node.id.
416- let sql = build_text_search ( & node. id , inputs, & props) . map_err ( |e| {
431+ // Full-Text Search runs as a two-step path in the executor (the
432+ // v1.5 fts PRAGMA can't see tables created in the same -c
433+ // invocation). The planner records the spec; sql stays empty.
434+ let spec = build_text_search_spec ( & node. id , inputs, & props) . map_err ( |e| {
417435 EngineError :: Config ( format ! ( "{} ({} / {}): {}" , node. data. label, component_id, node. id, e) )
418436 } ) ?;
419- ( format ! ( "{}{}" , attach, sql) , StageKind :: View , None )
437+ text_search = Some ( spec) ;
438+ ( String :: new ( ) , StageKind :: View , None )
420439 } else {
421440 let body = build_view_sql ( component_id, & props, inputs) . map_err ( |e| {
422441 EngineError :: Config ( format ! ( "{} ({} / {}): {}" , node. data. label, component_id, node. id, e) )
@@ -453,6 +472,7 @@ fn build_stage(
453472 sink_path,
454473 sink_mode,
455474 upsert,
475+ text_search,
456476 } )
457477}
458478
@@ -2659,13 +2679,9 @@ fn build_avro_source(props: &JsonValue) -> String {
26592679 format ! ( "SELECT * FROM read_avro('{}')" , sql_escape( & path) )
26602680}
26612681
2662- /// Full-Text Search via the DuckDB fts extension. The extension's
2663- /// API (PRAGMA create_fts_index + a per-table fts_main_<table>.match_bm25
2664- /// function) needs a named table to index, so this materializes the
2665- /// upstream into a temp table, builds the BM25 index on it, then
2666- /// produces `<node>` with the original columns + a score column.
2667- /// Optionally limits to the top-K matches.
2668- fn build_text_search ( node_id : & str , inputs : & NodeInputs , props : & JsonValue ) -> Result < String , String > {
2682+ /// Validate the text-search form and produce the spec the executor
2683+ /// uses to run the two CLI calls (stage table -> index + final query).
2684+ fn build_text_search_spec ( node_id : & str , inputs : & NodeInputs , props : & JsonValue ) -> Result < TextSearchSpec , String > {
26692685 let upstream = inputs
26702686 . main ( )
26712687 . ok_or_else ( || missing_input_msg ( "xf.ai.text_search" ) ) ?;
@@ -2683,49 +2699,23 @@ fn build_text_search(node_id: &str, inputs: &NodeInputs, props: &JsonValue) -> R
26832699 . get ( "topK" )
26842700 . and_then ( |v| v. as_u64 ( ) )
26852701 . filter ( |k| * k > 0 ) ;
2686- let output = string_prop ( props, "outputColumn" )
2702+ let output_col = string_prop ( props, "outputColumn" )
26872703 . filter ( |s| !s. is_empty ( ) )
26882704 . unwrap_or_else ( || "score" . into ( ) ) ;
2689-
2690- // Sanitized temp-table suffix from the node id (table names must
2691- // be valid SQL identifiers; the upstream node id may not be).
26922705 let suffix: String = node_id
26932706 . chars ( )
26942707 . map ( |c| if c. is_ascii_alphanumeric ( ) { c } else { '_' } )
26952708 . collect ( ) ;
2696- let temp_table = format ! ( "_fts_{}" , suffix) ;
2697- let index_schema = format ! ( "fts_main_{}" , temp_table) ;
2698-
2699- let text_args = text_cols
2700- . iter ( )
2701- . map ( |c| format ! ( "'{}'" , c. replace( '\'' , "''" ) ) )
2702- . collect :: < Vec < _ > > ( )
2703- . join ( ", " ) ;
2704- let match_expr = format ! (
2705- "{}.match_bm25({}, '{}')" ,
2706- index_schema,
2707- quote_ident( & id_col) ,
2708- query. replace( '\'' , "''" )
2709- ) ;
2710- let order_limit = match top_k {
2711- Some ( k) => format ! ( " ORDER BY {} DESC LIMIT {}" , quote_ident( & output) , k) ,
2712- None => String :: new ( ) ,
2713- } ;
2714- Ok ( format ! (
2715- "DROP TABLE IF EXISTS {temp}; \
2716- CREATE TEMP TABLE {temp} AS SELECT * FROM {up}; \
2717- PRAGMA create_fts_index('{temp_raw}', '{id_col}', {text_args}); \
2718- CREATE OR REPLACE TABLE {node} AS \
2719- SELECT *, {match} AS {out} FROM {temp} \
2720- WHERE {match} IS NOT NULL{order_limit}",
2721- temp = quote_ident( & temp_table) ,
2722- temp_raw = temp_table,
2723- up = quote_ident( upstream) ,
2724- id_col = id_col,
2725- node = quote_ident( node_id) ,
2726- match = match_expr,
2727- out = quote_ident( & output) ,
2728- ) )
2709+ let staging_table = format ! ( "_fts_{}" , suffix) ;
2710+ Ok ( TextSearchSpec {
2711+ from_view : upstream. to_string ( ) ,
2712+ id_col,
2713+ text_cols,
2714+ query,
2715+ top_k,
2716+ output_col,
2717+ staging_table,
2718+ } )
27292719}
27302720
27312721/// Vector Similarity Search via the DuckDB vss extension. Adds a
0 commit comments