Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
dbd889c
refactor: stream recursive cte memory blocks by chunk
KKould Mar 11, 2026
04b91e4
refactor: remove exec_id for TransformRecursiveCteScan
KKould Mar 11, 2026
d5d5eef
refactor: stream recursive cte source by block
KKould Mar 11, 2026
6a21920
fix: stabilize recursive CTE reuse across decorrelated subqueries
KKould Mar 12, 2026
66f55aa
chore: codefmt
KKould Mar 12, 2026
f3ae46d
Merge branch 'main' into refacotr/r_cte_stream
KKould Mar 13, 2026
ea2115e
fix: stabilize TTC readiness in hybrid sqllogictests
KKould Mar 13, 2026
e8272de
fix: reacquire recursive CTE cache ownership after aborted population
KKould Mar 13, 2026
543b1d5
fix: allocate recursive CTE ids from query-global metadata
KKould Mar 13, 2026
92de3e8
chore: codefmt
KKould Mar 13, 2026
f7e3547
chore: drop recursive CTE stress case and TTC readiness tweaks
KKould Mar 13, 2026
d076cb0
fix: reset all recursive scan tables when reacquiring cache ownership
KKould Mar 13, 2026
bc0c0ae
fix: consume recursive CTE scan output by generation
KKould Mar 13, 2026
18df7e1
fix: decouple recursive cte step execution from source polling
KKould Mar 14, 2026
0f6c785
chore: fix ci
KKould Mar 14, 2026
5447b8c
Merge branch 'main' into refacotr/r_cte_stream
KKould Mar 14, 2026
1c02b40
Merge branch 'main' into refacotr/r_cte_stream
KKould Mar 15, 2026
7414a96
fix: share recursive cte cleanup state across child contexts
KKould Mar 16, 2026
9a1c550
fix: use event notification for recursive CTE cache replay
KKould Mar 16, 2026
5e4926b
chore: codefmt
KKould Mar 16, 2026
035ef82
fix: separate materialized and recursive CTE temp table cleanup
KKould Mar 16, 2026
04274d0
fix: clean up recursive cte temp tables correctly
KKould Mar 16, 2026
48e6914
chore: rollback comment
KKould Mar 16, 2026
ed095be
fix: stabilize recursive cte replay cache selection
KKould Mar 16, 2026
fb83ed4
fix: always run recursive cte temp-table cleanup
KKould Mar 16, 2026
44987bf
drop recursive cte tables through catalog cleanup
KKould Mar 16, 2026
c885c3e
fix: clean up recursive cte temp tables correctly
KKould Mar 16, 2026
7c11375
fix: reset all recursive scan tables on cache reacquire
KKould Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/query/service/src/physical_plans/physical_r_cte_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct RecursiveCteScan {
meta: PhysicalPlanMeta,
pub output_schema: DataSchemaRef,
pub table_name: String,
pub logical_recursive_cte_id: Option<u32>,
pub stat: PlanStatsInfo,
pub exec_id: Option<u64>,
}

#[typetag::serde]
Expand All @@ -60,8 +60,8 @@ impl IPhysicalPlan for RecursiveCteScan {
meta: self.meta.clone(),
output_schema: self.output_schema.clone(),
table_name: self.table_name.clone(),
logical_recursive_cte_id: self.logical_recursive_cte_id,
stat: self.stat.clone(),
exec_id: self.exec_id,
})
}

Expand All @@ -73,7 +73,6 @@ impl IPhysicalPlan for RecursiveCteScan {
builder.ctx.clone(),
output_port.clone(),
self.table_name.clone(),
self.exec_id,
)
},
1,
Expand All @@ -92,8 +91,8 @@ impl PhysicalPlanBuilder {
meta: PhysicalPlanMeta::new("RecursiveCteScan"),
output_schema: DataSchemaRefExt::create(recursive_cte_scan.fields.clone()),
table_name: recursive_cte_scan.table_name.clone(),
logical_recursive_cte_id: recursive_cte_scan.logical_recursive_cte_id,
stat: stat_info,
exec_id: None,
}))
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/physical_plans/physical_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct UnionAll {
pub right_outputs: Vec<(Symbol, Option<RemoteExpr>)>,
pub schema: DataSchemaRef,
pub cte_scan_names: Vec<String>,
pub logical_recursive_cte_id: Option<u32>,

// Only used for explain
pub stat_info: Option<PlanStatsInfo>,
Expand Down Expand Up @@ -109,6 +110,7 @@ impl IPhysicalPlan for UnionAll {
right_outputs: self.right_outputs.clone(),
schema: self.schema.clone(),
cte_scan_names: self.cte_scan_names.clone(),
logical_recursive_cte_id: self.logical_recursive_cte_id,
stat_info: self.stat_info.clone(),
})
}
Expand Down Expand Up @@ -299,6 +301,7 @@ impl PhysicalPlanBuilder {
schema: DataSchemaRefExt::create(fields),

cte_scan_names: union_all.cte_scan_names.clone(),
logical_recursive_cte_id: union_all.logical_recursive_cte_id,
stat_info: Some(stat_info),
}))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_pipeline::core::OutputPort;
Expand All @@ -31,15 +30,14 @@ pub struct TransformRecursiveCteScan {
ctx: Arc<QueryContext>,
table: Option<Arc<dyn Table>>,
table_name: String,
exec_id: Option<u64>,
reader_id: Option<u64>,
}

impl TransformRecursiveCteScan {
pub fn create(
ctx: Arc<QueryContext>,
output_port: Arc<OutputPort>,
table_name: String,
exec_id: Option<u64>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(
ctx.get_scan_progress(),
Expand All @@ -48,7 +46,7 @@ impl TransformRecursiveCteScan {
ctx,
table: None,
table_name,
exec_id,
reader_id: None,
},
)
}
Expand Down Expand Up @@ -78,22 +76,25 @@ impl AsyncSource for TransformRecursiveCteScan {
.as_any()
.downcast_ref::<RecursiveCteMemoryTable>()
.unwrap();
let data = if let Some(id) = self.exec_id {
memory_table.take_by_id(id)
let reader_id = if let Some(reader_id) = self.reader_id {
reader_id
} else {
return Err(ErrorCode::Internal(format!(
"Internal, TransformRecursiveCteScan not exec_id on CTE: {}",
self.table_name,
)));
let reader_id = memory_table.register_reader();
self.reader_id = Some(reader_id);
reader_id
};
if data.is_empty() {
return Ok(None);
}
let data = DataBlock::concat(&data)?;
if data.is_empty() {
Ok(None)
} else {
Ok(Some(data))
let data = memory_table.take_one_block(reader_id);
Ok(data.filter(|block| block.num_rows() > 0))
}

async fn on_finish(&mut self) -> Result<()> {
if let (Some(table), Some(reader_id)) = (&self.table, self.reader_id.take()) {
let memory_table = table
.as_any()
.downcast_ref::<RecursiveCteMemoryTable>()
.unwrap();
memory_table.unregister_reader(reader_id);
}
Ok(())
}
}
Loading
Loading