Skip to content

Commit d7ccb93

Browse files
authored
perf(window): return Series from window ops to reduce intermediate RecordBatch copies (#7011)
Refactored the finalize() path in four window function physical nodes to reduce peak memory usage. Previously, window operation methods returned a new RecordBatch containing all original columns plus the new window column. Calling multiple window functions on a partition would create N intermediate RecordBatch copies, each growing by one column. Now, window operation methods return just the new Series column. All new columns are collected first, then appended to the original partition in a single union. ``` | Operation | Before | Now | Δ | |-----------------------|-----------|-----------|------| | partition & order by | 17,009 MB | 14,750 MB | −13% | | partition_only | 13,918 MB | 9,400 MB | −32% | | dynamic_frame | 17,917 MB | 14,428 MB | −19% | | order_by_only | 15,170 MB | 10,370 MB | −32% | ``` One of a few optimizations that can help with: #5316
1 parent afb8558 commit d7ccb93

4 files changed

Lines changed: 155 additions & 187 deletions

File tree

src/daft-local-execution/src/sinks/window_order_by_only.rs

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use daft_dsl::{
88
expr::bound_expr::{BoundExpr, BoundWindowExpr},
99
};
1010
use daft_micropartition::MicroPartition;
11+
use daft_recordbatch::RecordBatch;
1112
use itertools::Itertools;
1213
use tracing::{Span, instrument};
1314

@@ -136,35 +137,35 @@ impl BlockingSink for WindowOrderByOnlySink {
136137
continue;
137138
}
138139

139-
let mut result_batch = batch;
140-
141-
// Apply each window expression
142-
for (wexpr, name) in params.window_exprs.iter().zip(&params.aliases) {
143-
result_batch = match wexpr.as_ref() {
144-
WindowExpr::RowNumber => {
145-
result_batch.window_row_number(name.clone())?
146-
}
147-
WindowExpr::Rank => result_batch.window_rank(
148-
name.clone(),
149-
&params.order_by,
150-
false,
151-
)?,
152-
WindowExpr::DenseRank => result_batch.window_rank(
153-
name.clone(),
154-
&params.order_by,
155-
true,
156-
)?,
157-
_ => {
158-
return Err(DaftError::ValueError(
140+
let new_cols: Vec<Series> = params
141+
.window_exprs
142+
.iter()
143+
.zip(&params.aliases)
144+
.map(|(window_expr, name)| -> DaftResult<Series> {
145+
match window_expr.as_ref() {
146+
WindowExpr::RowNumber => batch.window_row_number_col(name),
147+
WindowExpr::Rank => {
148+
batch.window_rank_col(name, &params.order_by, false)
149+
}
150+
WindowExpr::DenseRank => {
151+
batch.window_rank_col(name, &params.order_by, true)
152+
}
153+
_ => Err(DaftError::ValueError(
159154
format!(
160155
"Unsupported window function for order by only: {:?}",
161-
wexpr
156+
window_expr
162157
)
163158
.into(),
164-
));
159+
)),
165160
}
166-
};
167-
}
161+
})
162+
.collect::<DaftResult<_>>()?;
163+
164+
let result_batch = if new_cols.is_empty() {
165+
batch
166+
} else {
167+
batch.union(&RecordBatch::from_nonempty_columns(new_cols)?)?
168+
};
168169
out_batches.push(result_batch);
169170
}
170171

src/daft-local-execution/src/sinks/window_partition_and_dynamic_frame.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -169,41 +169,49 @@ impl BlockingSink for WindowPartitionAndDynamicFrameSink {
169169
input_data.eval_expression_list(&params.partition_by)?;
170170
let (_, partitionvals_indices) = partitionby_table.make_groups()?;
171171

172-
let mut partitions = partitionvals_indices
172+
let grouped_results: Vec<RecordBatch> = partitionvals_indices
173173
.iter()
174-
.map(|indices| {
174+
.map(|indices| -> DaftResult<RecordBatch> {
175175
let indices_arr =
176176
UInt64Array::from_vec("indices", indices.to_vec());
177-
input_data.take(&indices_arr).unwrap()
178-
})
179-
.collect::<Vec<_>>();
180-
181-
for partition in &mut partitions {
182-
// Sort the partition by the order_by columns
183-
*partition = partition.sort(
184-
&params.order_by,
185-
&params.descending,
186-
&params.nulls_first,
187-
)?;
188-
189-
for (agg_expr, name) in
190-
params.aggregations.iter().zip(params.aliases.iter())
191-
{
192-
let dtype =
193-
agg_expr.as_ref().to_field(&params.original_schema)?.dtype;
194-
*partition = partition.window_agg_dynamic_frame(
195-
name.clone(),
196-
agg_expr,
177+
let partition = input_data.take(&indices_arr)?;
178+
let partition = partition.sort(
197179
&params.order_by,
198180
&params.descending,
199-
params.min_periods,
200-
&dtype,
201-
&params.frame,
181+
&params.nulls_first,
202182
)?;
203-
}
204-
}
205183

206-
let final_result = RecordBatch::concat(&partitions)?;
184+
let new_cols: Vec<Series> = params
185+
.aggregations
186+
.iter()
187+
.zip(params.aliases.iter())
188+
.map(|(agg_expr, name)| -> DaftResult<Series> {
189+
let dtype = agg_expr
190+
.as_ref()
191+
.to_field(&params.original_schema)?
192+
.dtype;
193+
partition.window_agg_dynamic_frame_col(
194+
name,
195+
agg_expr,
196+
&params.order_by,
197+
&params.descending,
198+
params.min_periods,
199+
&dtype,
200+
&params.frame,
201+
)
202+
})
203+
.collect::<DaftResult<_>>()?;
204+
205+
if new_cols.is_empty() {
206+
Ok(partition)
207+
} else {
208+
partition
209+
.union(&RecordBatch::from_nonempty_columns(new_cols)?)
210+
}
211+
})
212+
.collect::<DaftResult<_>>()?;
213+
214+
let final_result = RecordBatch::concat(&grouped_results)?;
207215
Ok(final_result)
208216
});
209217
}

src/daft-local-execution/src/sinks/window_partition_and_order_by.rs

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -160,67 +160,69 @@ impl BlockingSink for WindowPartitionAndOrderBySink {
160160
input_data.eval_expression_list(&params.partition_by)?;
161161
let (_, groupvals_indices) = groupby_table.make_groups()?;
162162

163-
let mut partitions = groupvals_indices
163+
let grouped_results: Vec<RecordBatch> = groupvals_indices
164164
.iter()
165-
.map(|indices| {
165+
.map(|indices| -> DaftResult<RecordBatch> {
166166
let indices_arr =
167167
UInt64Array::from_vec("indices", indices.to_vec());
168-
input_data.take(&indices_arr).unwrap()
169-
})
170-
.collect::<Vec<_>>();
171-
172-
for partition in &mut partitions {
173-
// Sort the partition by the order_by columns
174-
*partition = partition.sort(
175-
&params.order_by,
176-
&params.descending,
177-
&params.nulls_first,
178-
)?;
179-
180-
for (window_expr, name) in
181-
params.window_exprs.iter().zip(params.aliases.iter())
182-
{
183-
*partition = match window_expr.as_ref() {
184-
WindowExpr::Agg(agg_expr) => {
185-
let new_col = partition
186-
.eval_expression(&BoundExpr::new_unchecked(
187-
Arc::new(Expr::Agg(agg_expr.clone())),
188-
))?
189-
.broadcast(partition.len())?
190-
.rename(name.clone());
191-
let agg_batch =
192-
RecordBatch::from_nonempty_columns(vec![new_col])?;
193-
partition.union(&agg_batch)?
194-
}
195-
WindowExpr::RowNumber => {
196-
partition.window_row_number(name.clone())?
197-
}
198-
WindowExpr::Rank => partition.window_rank(
199-
name.clone(),
200-
&params.order_by,
201-
false,
202-
)?,
203-
WindowExpr::DenseRank => partition.window_rank(
204-
name.clone(),
205-
&params.order_by,
206-
true,
207-
)?,
208-
WindowExpr::Offset {
209-
input,
210-
offset,
211-
default,
212-
} => partition.window_offset(
213-
name.clone(),
214-
BoundExpr::new_unchecked(input.clone()),
215-
*offset,
216-
default.clone().map(BoundExpr::new_unchecked),
217-
)?,
168+
let partition = input_data.take(&indices_arr)?;
169+
let partition = partition.sort(
170+
&params.order_by,
171+
&params.descending,
172+
&params.nulls_first,
173+
)?;
174+
175+
let new_cols: Vec<Series> = params
176+
.window_exprs
177+
.iter()
178+
.zip(params.aliases.iter())
179+
.map(|(window_expr, name)| -> DaftResult<Series> {
180+
match window_expr.as_ref() {
181+
WindowExpr::Agg(agg_expr) => {
182+
let agg = partition.eval_expression(
183+
&BoundExpr::new_unchecked(Arc::new(
184+
Expr::Agg(agg_expr.clone()),
185+
)),
186+
)?;
187+
Ok(agg.broadcast(partition.len())?.rename(name))
188+
}
189+
WindowExpr::RowNumber => {
190+
partition.window_row_number_col(name)
191+
}
192+
WindowExpr::Rank => partition.window_rank_col(
193+
name,
194+
&params.order_by,
195+
false,
196+
),
197+
WindowExpr::DenseRank => partition.window_rank_col(
198+
name,
199+
&params.order_by,
200+
true,
201+
),
202+
WindowExpr::Offset {
203+
input,
204+
offset,
205+
default,
206+
} => partition.window_offset_col(
207+
name,
208+
BoundExpr::new_unchecked(input.clone()),
209+
*offset,
210+
default.clone().map(BoundExpr::new_unchecked),
211+
),
212+
}
213+
})
214+
.collect::<DaftResult<_>>()?;
215+
216+
if new_cols.is_empty() {
217+
Ok(partition)
218+
} else {
219+
partition
220+
.union(&RecordBatch::from_nonempty_columns(new_cols)?)
218221
}
219-
}
220-
}
222+
})
223+
.collect::<DaftResult<_>>()?;
221224

222-
let final_result = RecordBatch::concat(&partitions)?;
223-
Ok(final_result)
225+
RecordBatch::concat(&grouped_results)
224226
});
225227
}
226228

0 commit comments

Comments
 (0)