Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/query/service/src/physical_plans/physical_r_cte_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct RecursiveCteScan {
meta: PhysicalPlanMeta,
pub output_schema: DataSchemaRef,
pub table_name: String,
pub logical_recursive_cte_id: Option<u32>,
pub stat: PlanStatsInfo,
pub exec_id: Option<u64>,
}

#[typetag::serde]
Expand All @@ -60,8 +60,8 @@ impl IPhysicalPlan for RecursiveCteScan {
meta: self.meta.clone(),
output_schema: self.output_schema.clone(),
table_name: self.table_name.clone(),
logical_recursive_cte_id: self.logical_recursive_cte_id,
stat: self.stat.clone(),
exec_id: self.exec_id,
})
}

Expand All @@ -73,7 +73,6 @@ impl IPhysicalPlan for RecursiveCteScan {
builder.ctx.clone(),
output_port.clone(),
self.table_name.clone(),
self.exec_id,
)
},
1,
Expand All @@ -92,8 +91,8 @@ impl PhysicalPlanBuilder {
meta: PhysicalPlanMeta::new("RecursiveCteScan"),
output_schema: DataSchemaRefExt::create(recursive_cte_scan.fields.clone()),
table_name: recursive_cte_scan.table_name.clone(),
logical_recursive_cte_id: recursive_cte_scan.logical_recursive_cte_id,
stat: stat_info,
exec_id: None,
}))
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/physical_plans/physical_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct UnionAll {
pub right_outputs: Vec<(Symbol, Option<RemoteExpr>)>,
pub schema: DataSchemaRef,
pub cte_scan_names: Vec<String>,
pub logical_recursive_cte_id: Option<u32>,

// Only used for explain
pub stat_info: Option<PlanStatsInfo>,
Expand Down Expand Up @@ -109,6 +110,7 @@ impl IPhysicalPlan for UnionAll {
right_outputs: self.right_outputs.clone(),
schema: self.schema.clone(),
cte_scan_names: self.cte_scan_names.clone(),
logical_recursive_cte_id: self.logical_recursive_cte_id,
stat_info: self.stat_info.clone(),
})
}
Expand Down Expand Up @@ -299,6 +301,7 @@ impl PhysicalPlanBuilder {
schema: DataSchemaRefExt::create(fields),

cte_scan_names: union_all.cte_scan_names.clone(),
logical_recursive_cte_id: union_all.logical_recursive_cte_id,
stat_info: Some(stat_info),
}))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_pipeline::core::OutputPort;
Expand All @@ -31,15 +30,14 @@ pub struct TransformRecursiveCteScan {
ctx: Arc<QueryContext>,
table: Option<Arc<dyn Table>>,
table_name: String,
exec_id: Option<u64>,
reader_id: Option<u64>,
}

impl TransformRecursiveCteScan {
pub fn create(
ctx: Arc<QueryContext>,
output_port: Arc<OutputPort>,
table_name: String,
exec_id: Option<u64>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(
ctx.get_scan_progress(),
Expand All @@ -48,7 +46,7 @@ impl TransformRecursiveCteScan {
ctx,
table: None,
table_name,
exec_id,
reader_id: None,
},
)
}
Expand Down Expand Up @@ -78,22 +76,29 @@ impl AsyncSource for TransformRecursiveCteScan {
.as_any()
.downcast_ref::<RecursiveCteMemoryTable>()
.unwrap();
let data = if let Some(id) = self.exec_id {
memory_table.take_by_id(id)
let reader_id = if let Some(reader_id) = self.reader_id {
reader_id
} else {
return Err(ErrorCode::Internal(format!(
"Internal, TransformRecursiveCteScan not exec_id on CTE: {}",
self.table_name,
)));
let reader_id = memory_table.register_reader();
self.reader_id = Some(reader_id);
reader_id
};
let data = memory_table.take_generation_blocks(reader_id);
if data.is_empty() {
return Ok(None);
}
let data = DataBlock::concat(&data)?;
if data.is_empty() {
Ok(None)
} else {
Ok(Some(data))
Ok((data.num_rows() > 0).then_some(data))
}

async fn on_finish(&mut self) -> Result<()> {
if let (Some(table), Some(reader_id)) = (&self.table, self.reader_id.take()) {
let memory_table = table
.as_any()
.downcast_ref::<RecursiveCteMemoryTable>()
.unwrap();
memory_table.unregister_reader(reader_id);
}
Ok(())
}
}
Loading
Loading