Skip to content

query hummock stats * need to query for upstream tables being scanned instead #20463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: graphite-base/20463
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ impl TableFunction {
args: vec![],
return_type: DataType::Struct(StructType::new(vec![
("job_id".to_owned(), DataType::Int32),
("row_count".to_owned(), DataType::Int64),
("row_count".to_owned(), DataType::Decimal),
])),
function_type: TableFunctionType::InternalBackfillProgress,
user_defined: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ use risingwave_common::catalog::{Field, Schema, StreamJobStatus, internal_table_
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_expr::aggregate::AggType;
pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
pub use risingwave_pb::plan_common::JoinType;

use super::{ApplyResult, BoxedRule, FallibleRule};
use crate::TableCatalog;
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::table_catalog::TableType;
use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
use crate::expr::{
AggCall, ExprImpl, ExprType, FunctionCall, InputRef, Literal, OrderBy, TableFunctionType,
};
use crate::optimizer::PlanRef;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
LogicalAgg, LogicalJoin, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion,
LogicalValues,
};
use crate::utils::{Condition, GroupBy};

Expand Down Expand Up @@ -60,12 +64,14 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
}

let mut counts = Vec::with_capacity(backfilling_tables.len());
let mut backfilling_job_ids = vec![];
for table in backfilling_tables {
let Some(job_id) = table.job_id else {
return ApplyResult::Err(
anyhow!("`job_id` column not found in backfill table").into(),
);
};
backfilling_job_ids.push(job_id);
let Some(row_count_column_index) =
table.columns.iter().position(|c| c.name() == "row_count")
else {
Expand Down Expand Up @@ -119,7 +125,7 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
}))]);
let (agg, _rewritten_select_exprs, _rewritten_having_exprs) =
LogicalAgg::create(select_exprs, group_key, None, union.into())?;
let project = LogicalProject::new(
let current_counts = LogicalProject::new(
agg,
vec![
ExprImpl::InputRef(Box::new(InputRef {
Expand All @@ -133,6 +139,91 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
.cast_explicit(DataType::Int64)?,
],
);

let total_counts = {
let catalog = plan.ctx().session_ctx().env().catalog_reader().read_guard();
let mut total_counts = vec![];
for job_id in backfilling_job_ids {
let total_key_count = if let Some(stats) =
catalog.table_stats().table_stats.get(&(job_id.table_id))
{
stats.total_key_count
} else {
return ApplyResult::Err(
anyhow!("Table stats not found for table_id: {}", job_id.table_id).into(),
);
};
let job_id_expr = ExprImpl::Literal(Box::new(Literal::new(
Some(ScalarImpl::Int32(job_id.table_id as i32)),
DataType::Int32,
)));
let total_key_count_expr = ExprImpl::Literal(Box::new(Literal::new(
Some(ScalarImpl::Int64(total_key_count as i64)),
DataType::Int64,
)));
let total_count = LogicalValues::new(
vec![vec![job_id_expr, total_key_count_expr]],
Schema::new(vec![
Field::new("job_id", DataType::Int32),
Field::new("total_key_count", DataType::Int64),
]),
plan.ctx().clone(),
);
total_counts.push(total_count.into());
}
LogicalUnion::new(true, total_counts)
};

let join = {
let conjunctions = vec![ExprImpl::FunctionCall(Box::new(FunctionCall::new(
ExprType::Equal,
vec![
ExprImpl::InputRef(Box::new(InputRef {
index: 0,
data_type: DataType::Int32,
})),
ExprImpl::InputRef(Box::new(InputRef {
index: 2,
data_type: DataType::Int32,
})),
],
)?))];
let condition = Condition { conjunctions };
LogicalJoin::new(
current_counts.into(),
total_counts.into(),
JoinType::Inner,
condition,
)
};

let project = {
let op1 = ExprImpl::InputRef(Box::new(InputRef {
index: 1,
data_type: DataType::Int64,
}))
.cast_implicit(DataType::Decimal)?;
let op2 = ExprImpl::InputRef(Box::new(InputRef {
index: 3,
data_type: DataType::Int64,
}))
.cast_implicit(DataType::Decimal)?;
let div_expr = ExprImpl::FunctionCall(Box::new(FunctionCall::new(
ExprType::Divide,
vec![op1, op2],
)?));
LogicalProject::new(
join.into(),
vec![
ExprImpl::InputRef(Box::new(InputRef {
index: 0,
data_type: DataType::Int32,
})),
div_expr,
],
)
};

ApplyResult::Ok(project.into())
}
}
Expand Down
Loading