Skip to content

Commit 8559148

Browse files
committed
core/translate: split window step emission by per-source-row vs per-buffered-row pattern
Window functions fall into two categories with respect to when their AggStep fires: aggregate-like functions (sum/count/avg as windows, and the upcoming rank-family functions) count source rows as they arrive from the input subquery and capture a single value per peer-group flush; row_number and the upcoming positional functions (lag, lead, first/last/nth_value) step per buffered row at flush time because their value changes within a peer group. Express this as a WindowFunc::steps_per_source_row() classifier and split the two emit sites accordingly: - emit_aggregation_step (called per source row) handles aggregate window funcs and the rank-family; - emit_return_buffered_rows pre-loop captures AggValue once per flush for both categories above, and the per-row loop body steps + reads only the per-buffered-row category. No behavioral change: the rank-family branch is currently dead because none of those functions are wired in name resolution yet. row_number continues to emit via the per-buffered-row path. Mirrors how SQLite gates its xStep emission per function family rather than running a runtime peer-detection flag.
1 parent dff4841 commit 8559148

2 files changed

Lines changed: 165 additions & 61 deletions

File tree

core/function.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,66 @@ impl WindowFunc {
569569
),
570570
}
571571
}
572+
573+
/// Whether the function produces a single value shared by every row in a
574+
/// peer group (true) or a distinct value per row (false).
575+
///
576+
/// This controls where `emit_aggregation_step` and
577+
/// `emit_peer_group_flush` emit the function's `AggStep` and
578+
/// `AggValue` opcodes:
579+
///
580+
/// * true (rank-family): `AggStep` is emitted inside the input-scan loop
581+
/// in `emit_aggregation_step`, so it runs once for every row read from
582+
/// the subquery and advances the function's internal counter. `AggValue`
583+
/// is emitted once per peer-group flush in `emit_peer_group_flush`,
584+
/// before the loop over buffered rows, reading the accumulator into a
585+
/// result register. The loop then writes that one register's contents
586+
/// into the output for every buffered row in the group.
587+
/// * false (row_number, ntile, lag, lead, first/last/nth_value): nothing
588+
/// is emitted in `emit_aggregation_step`. Both `AggStep` and `AggValue`
589+
/// are emitted inside the per-row loop in `emit_peer_group_flush`,
590+
/// so they run once per buffered row and produce a distinct value for
591+
/// each output row.
592+
///
593+
/// Concrete example. Given `scores(name, score)`:
594+
///
595+
/// alice 100
596+
/// bob 100
597+
/// carol 90
598+
///
599+
/// `rank() OVER (ORDER BY score DESC)` returns true — alice and bob are
600+
/// peers on score=100 and share the same rank:
601+
///
602+
/// alice 100 1
603+
/// bob 100 1
604+
/// carol 90 3
605+
///
606+
/// `row_number() OVER (ORDER BY score DESC)` returns false — every row
607+
/// gets a distinct number even within the score=100 peer group:
608+
///
609+
/// alice 100 1
610+
/// bob 100 2
611+
/// carol 90 3
612+
///
613+
/// Functions returning true: `rank`, `dense_rank`, `percent_rank`,
614+
/// `cume_dist` — their value is determined entirely by where the peer
615+
/// group sits in the partition's ordering.
616+
/// Functions returning false: `row_number`, `ntile`, `lag`, `lead`,
617+
/// `first_value`, `last_value`, `nth_value` — all depend on a row's
618+
/// position within the partition/frame, not just its peer group.
619+
pub fn one_value_per_peer_group(&self) -> bool {
620+
match self {
621+
Self::Rank | Self::DenseRank | Self::PercentRank | Self::CumeDist => true,
622+
Self::RowNumber
623+
| Self::Ntile
624+
| Self::Lag
625+
| Self::Lead
626+
| Self::FirstValue
627+
| Self::LastValue
628+
| Self::NthValue
629+
| Self::External(_) => false,
630+
}
631+
}
572632
}
573633

574634
impl PartialEq for WindowFunc {

core/translate/window.rs

Lines changed: 105 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,57 +1040,86 @@ fn emit_aggregation_step(
10401040
registers: &WindowRegisters,
10411041
) -> crate::Result<()> {
10421042
for (i, func) in window.functions.iter().enumerate() {
1043-
let AccumulatorFunc::Agg(agg_func) = &func.func else {
1044-
continue;
1045-
};
1046-
// The aggregation step is performed incrementally as each row from the subquery is
1047-
// processed. Therefore, we don’t need to access the buffer table and can obtain argument
1048-
// values directly by evaluating the expressions that reference the subquery result columns.
1049-
// Use the rewritten form when available so the args reference the subquery
1050-
// rather than the (no-longer-visible) original tables.
1051-
let args = match func.current_expr() {
1052-
Expr::FunctionCall { args, .. } => args.iter().map(|a| (**a).clone()).collect(),
1053-
Expr::FunctionCallStar { .. } => vec![],
1054-
_ => unreachable!(
1055-
"All window functions should be either FunctionCall or FunctionCallStar expressions"
1056-
),
1057-
};
1058-
10591043
let reg_acc_start = registers.acc_start + i;
1060-
// FILTER controls whether the current input row contributes to the
1061-
// running aggregate; it does not suppress the output row itself.
1062-
let filter_skip_label = if let Some(filter_expr) =
1063-
func.rewritten.as_ref().and_then(|r| r.filter_expr.as_ref())
1064-
{
1065-
let label = program.allocate_label();
1066-
let filter_reg = program.alloc_register();
1067-
translate_expr(
1068-
program,
1069-
Some(&plan.table_references),
1070-
filter_expr,
1071-
filter_reg,
1072-
resolver,
1073-
)?;
1074-
program.emit_insn(Insn::IfNot {
1075-
reg: filter_reg,
1076-
target_pc: label,
1077-
jump_if_null: true,
1078-
});
1079-
Some(label)
1080-
} else {
1081-
None
1082-
};
1044+
match &func.func {
1045+
AccumulatorFunc::Agg(agg_func) => {
1046+
// The aggregation step is performed incrementally as each row from the subquery is
1047+
// processed. Therefore, we don’t need to access the buffer table and can obtain
1048+
// argument values directly by evaluating the expressions that reference the
1049+
// subquery result columns. Use the rewritten form when available so the args
1050+
// reference the subquery rather than the (no-longer-visible) original tables.
1051+
let args = match func.current_expr() {
1052+
Expr::FunctionCall { args, .. } => {
1053+
args.iter().map(|a| (**a).clone()).collect()
1054+
}
1055+
Expr::FunctionCallStar { .. } => vec![],
1056+
_ => unreachable!(
1057+
"All window functions should be either FunctionCall or FunctionCallStar expressions"
1058+
),
1059+
};
10831060

1084-
translate_aggregation_step(
1085-
program,
1086-
&plan.table_references,
1087-
AggArgumentSource::new_from_expression(agg_func, &args, &Distinctness::NonDistinct),
1088-
reg_acc_start,
1089-
resolver,
1090-
None,
1091-
)?;
1092-
if let Some(label) = filter_skip_label {
1093-
program.preassign_label_to_next_insn(label);
1061+
// FILTER controls whether the current input row contributes to the
1062+
// running aggregate; it does not suppress the output row itself.
1063+
let filter_skip_label = if let Some(filter_expr) =
1064+
func.rewritten.as_ref().and_then(|r| r.filter_expr.as_ref())
1065+
{
1066+
let label = program.allocate_label();
1067+
let filter_reg = program.alloc_register();
1068+
translate_expr(
1069+
program,
1070+
Some(&plan.table_references),
1071+
filter_expr,
1072+
filter_reg,
1073+
resolver,
1074+
)?;
1075+
program.emit_insn(Insn::IfNot {
1076+
reg: filter_reg,
1077+
target_pc: label,
1078+
jump_if_null: true,
1079+
});
1080+
Some(label)
1081+
} else {
1082+
None
1083+
};
1084+
1085+
translate_aggregation_step(
1086+
program,
1087+
&plan.table_references,
1088+
AggArgumentSource::new_from_expression(
1089+
agg_func,
1090+
&args,
1091+
&Distinctness::NonDistinct,
1092+
),
1093+
reg_acc_start,
1094+
resolver,
1095+
None,
1096+
)?;
1097+
if let Some(label) = filter_skip_label {
1098+
program.preassign_label_to_next_insn(label);
1099+
}
1100+
}
1101+
AccumulatorFunc::Window(win_func) if win_func.one_value_per_peer_group() => {
1102+
// Rank-family functions take no arguments. Emit `AggStep`
1103+
// here, inside the input-scan loop, so the function's
1104+
// internal counter and peer-detection state advance once for
1105+
// every row read from the subquery. `AggValue` is NOT emitted
1106+
// here — the accumulator is read later, once per peer-group
1107+
// flush, in `emit_peer_group_flush`.
1108+
program.emit_insn(Insn::AggStep {
1109+
acc_reg: reg_acc_start,
1110+
col: 0,
1111+
delimiter: 0,
1112+
func: AccumulatorFunc::Window(win_func.clone()),
1113+
comparator: None,
1114+
});
1115+
}
1116+
AccumulatorFunc::Window(_) => {
1117+
// row_number, ntile, lag, lead, first/last/nth_value: emit
1118+
// nothing here. Their value differs between rows of the same
1119+
// peer group, so both `AggStep` and `AggValue` are emitted
1120+
// later inside the loop over buffered rows in
1121+
// `emit_peer_group_flush`, once per output row.
1122+
}
10941123
}
10951124
}
10961125

@@ -1142,7 +1171,7 @@ pub fn emit_window_results(
11421171
pc_if_empty: label_empty,
11431172
});
11441173

1145-
emit_return_buffered_rows(program, window, t_ctx, plan)?;
1174+
emit_peer_group_flush(program, window, t_ctx, plan)?;
11461175

11471176
program.preassign_label_to_next_insn(label_empty);
11481177

@@ -1159,7 +1188,7 @@ pub fn emit_window_results(
11591188
Ok(())
11601189
}
11611190

1162-
fn emit_return_buffered_rows(
1191+
fn emit_peer_group_flush(
11631192
program: &mut ProgramBuilder,
11641193
window: &Window,
11651194
t_ctx: &mut TranslateCtx,
@@ -1173,15 +1202,23 @@ fn emit_return_buffered_rows(
11731202
..
11741203
} = t_ctx.meta_window.as_ref().expect("missing window metadata");
11751204

1176-
// Aggregate window functions: capture the running accumulator value once
1177-
// per flush. Every buffered row in the just-finished peer group sees the
1178-
// same value (RANGE UNBOUNDED PRECEDING TO CURRENT ROW semantics).
1205+
// For aggregate window functions and the rank-family, the accumulator
1206+
// has already been advanced once per source row during the input scan.
1207+
// Emit one `AggValue` per such function here, before entering the loop
1208+
// over buffered rows: this reads the accumulator into the result
1209+
// register exactly once per peer-group flush. The per-row loop below
1210+
// will copy that register's contents into the output for every buffered
1211+
// row in the group (RANGE UNBOUNDED PRECEDING TO CURRENT ROW semantics).
11791212
for (i, func) in window.functions.iter().enumerate() {
1180-
if let AccumulatorFunc::Agg(agg_func) = &func.func {
1213+
let value_pre_loop = match &func.func {
1214+
AccumulatorFunc::Agg(_) => true,
1215+
AccumulatorFunc::Window(w) => w.one_value_per_peer_group(),
1216+
};
1217+
if value_pre_loop {
11811218
program.emit_insn(Insn::AggValue {
11821219
acc_reg: registers.acc_start + i,
11831220
dest_reg: registers.acc_result_start + i,
1184-
func: AccumulatorFunc::Agg(agg_func.clone()),
1221+
func: func.func.clone(),
11851222
});
11861223
}
11871224
}
@@ -1197,13 +1234,20 @@ fn emit_return_buffered_rows(
11971234
let reg_result = registers.result_columns_start + i;
11981235
program.emit_column_or_rowid(cursors.csr_current, *col_idx, reg_result);
11991236
}
1200-
// Pure window functions (e.g. row_number) advance their accumulator and
1201-
// read it out once per buffered row as the buffer is replayed. Aggregate
1202-
// window functions instead advance their accumulator once per source row
1203-
// (in emit_aggregation_step) and only their final value is read out per
1204-
// emitted row (the AggValue loop earlier in this function).
1237+
// For row_number, ntile, lag, lead and first/last/nth_value no
1238+
// `AggStep` ran during the input scan. Emit `AggStep` + `AggValue` here,
1239+
// inside the per-row loop over buffered rows: `AggStep` advances the
1240+
// function's state for the current row, then `AggValue` reads the
1241+
// resulting value into the result register that gets written to this
1242+
// row's output. This is the site that gives every buffered row in a
1243+
// peer group a distinct value. Functions where
1244+
// `one_value_per_peer_group()` is true were handled above the loop and
1245+
// are skipped here.
12051246
for (i, func) in window.functions.iter().enumerate() {
12061247
if let AccumulatorFunc::Window(win_func) = &func.func {
1248+
if win_func.one_value_per_peer_group() {
1249+
continue;
1250+
}
12071251
let acc_reg = registers.acc_start + i;
12081252
let dest_reg = registers.acc_result_start + i;
12091253
program.emit_insn(Insn::AggStep {

0 commit comments

Comments
 (0)