perf(inline-agg): add BoolAnd and BoolOr accumulator types#6984
perf(inline-agg): add BoolAnd and BoolOr accumulator types#6984BABTUNA wants to merge 2 commits into
Conversation
Adds BoolAnd and BoolOr to the inline grouped aggregation path, completing inline coverage of the standard reducer-style aggregates (Count / Sum / Min / Max / Product / BoolAnd / BoolOr). Each accumulator holds a per-group `Option<bool>` state; first non-null value seeds the state, subsequent non-null values combine via `&&` (BoolAnd) or `||` (BoolOr). Output dtype is Boolean. Implementation note: `BooleanArray::values()` doesn't expose a `&[bool]` slice because Arrow stores bools bit-packed, so the null-free tight loop uses `self.source.to_bitmap()` + `bitmap.value(row_idx)` instead of the `.zip(values().iter())` pattern Sum/Product use over primitive slices. Functionally equivalent, just a different access pattern forced by the storage layout. Changes: - New `define_bool_and_accum!` and `define_bool_or_accum!` macros (kept separate per the Sum/Product precedent — semantically distinct ops with different identity and absorbing elements). - `define_agg_accumulator_enum!` extended with `BoolAnd` and `BoolOr` variants. - `try_create_accumulator` dispatches `AggExpr::BoolAnd(expr)` and `AggExpr::BoolOr(expr)` to the respective accumulators on `DataType::Boolean`. - `can_inline_agg` adds a separate Boolean-only dtype arm (the existing numeric arm for Sum/Min/Max/Product is unchanged). - 5 new tests (no-null tight loop, null-value branch, Int64 FNV fast path, all-null vals). Short-circuit opportunity (stop scanning a group once BoolAnd hits `false` or BoolOr hits `true`) intentionally deferred — adding a per-row branch to the hot loop would regress non-short-circuiting groups, and Sum/Min/Max have analogous opportunities that they also don't take. All 37 inline_agg tests pass (32 pre-existing + 5 new). cargo fmt and cargo clippy --release --features python both clean. Part of Eventual-Inc#6585 (item 7).
Greptile SummaryThis PR adds
Confidence Score: 5/5Safe to merge; the new accumulators are self-contained and do not touch any existing dispatch paths. Both BoolAndAccum and BoolOrAccum correctly implement skip-null SQL semantics, the bitmap access via to_bitmap().value(row_idx) correctly accounts for Arrow's internal buffer offset, and the finalize path properly produces nullable vs. non-null output. No existing dispatch paths are modified. No files require special attention. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[AggExpr::BoolAnd / BoolOr] --> B{can_inline_agg?}
B -- dtype == Boolean --> C[try_create_accumulator]
B -- unsupported dtype --> D[fallback path]
C --> E[BoolAndAccum / BoolOrAccum::new]
E --> F{null_count == 0?}
F -- yes --> G[tight loop: to_bitmap]
F -- no --> H[null-aware loop: source.get]
G --> I[Option-bool per group]
H --> I
I --> J{any group is None?}
J -- yes --> K[from_iter nullable output]
J -- no --> L[from_values non-null output]
Reviews (2): Last reviewed commit: "test(inline-agg): cover BoolOr partial-n..." | Re-trigger Greptile |
| macro_rules! define_bool_and_accum { | ||
| ($name:ident) => { | ||
| struct $name { | ||
| accumulators: Vec<Option<bool>>, | ||
| source: BooleanArray, | ||
| } | ||
|
|
||
| impl $name { | ||
| fn new(source: BooleanArray) -> Self { | ||
| Self { | ||
| accumulators: Vec::new(), | ||
| source, | ||
| } | ||
| } | ||
|
|
||
| fn init_groups(&mut self, n: usize) { | ||
| self.accumulators.resize(n, None); | ||
| } | ||
|
|
||
| /// Vectorized batch update over pre-computed group_ids. | ||
| fn update_batch(&mut self, group_ids: &[u32]) { | ||
| let accs = &mut self.accumulators; | ||
| if self.source.null_count() == 0 { | ||
| // Tight loop: no null checks needed on source values. | ||
| let bitmap = self.source.to_bitmap(); | ||
| for (row_idx, &gid) in group_ids.iter().enumerate() { | ||
| let val = bitmap.value(row_idx); | ||
| let acc = &mut accs[gid as usize]; | ||
| *acc = Some(match *acc { | ||
| Some(a) => a && val, | ||
| None => val, | ||
| }); | ||
| } | ||
| } else { | ||
| // Source has nulls: check each value. | ||
| for (row_idx, &gid) in group_ids.iter().enumerate() { | ||
| if let Some(val) = self.source.get(row_idx) { | ||
| let acc = &mut accs[gid as usize]; | ||
| *acc = Some(match *acc { | ||
| Some(a) => a && val, | ||
| None => val, | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn finalize(self, name: &str) -> DaftResult<Series> { | ||
| let has_nulls = self.accumulators.iter().any(|a| a.is_none()); | ||
| if has_nulls { | ||
| Ok(BooleanArray::from_iter(name, self.accumulators.into_iter()).into_series()) | ||
| } else { | ||
| Ok(BooleanArray::from_values( | ||
| name, | ||
| self.accumulators.into_iter().map(|opt| opt.unwrap()), | ||
| ) | ||
| .into_series()) | ||
| } | ||
| } | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| macro_rules! define_bool_or_accum { | ||
| ($name:ident) => { | ||
| struct $name { | ||
| accumulators: Vec<Option<bool>>, | ||
| source: BooleanArray, | ||
| } | ||
|
|
||
| impl $name { | ||
| fn new(source: BooleanArray) -> Self { | ||
| Self { | ||
| accumulators: Vec::new(), | ||
| source, | ||
| } | ||
| } | ||
|
|
||
| fn init_groups(&mut self, n: usize) { | ||
| self.accumulators.resize(n, None); | ||
| } | ||
|
|
||
| /// Vectorized batch update over pre-computed group_ids. | ||
| fn update_batch(&mut self, group_ids: &[u32]) { | ||
| let accs = &mut self.accumulators; | ||
| if self.source.null_count() == 0 { | ||
| // Tight loop: no null checks needed on source values. | ||
| let bitmap = self.source.to_bitmap(); | ||
| for (row_idx, &gid) in group_ids.iter().enumerate() { | ||
| let val = bitmap.value(row_idx); | ||
| let acc = &mut accs[gid as usize]; | ||
| *acc = Some(match *acc { | ||
| Some(a) => a || val, | ||
| None => val, | ||
| }); | ||
| } | ||
| } else { | ||
| // Source has nulls: check each value. | ||
| for (row_idx, &gid) in group_ids.iter().enumerate() { | ||
| if let Some(val) = self.source.get(row_idx) { | ||
| let acc = &mut accs[gid as usize]; | ||
| *acc = Some(match *acc { | ||
| Some(a) => a || val, | ||
| None => val, | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn finalize(self, name: &str) -> DaftResult<Series> { | ||
| let has_nulls = self.accumulators.iter().any(|a| a.is_none()); | ||
| if has_nulls { | ||
| Ok(BooleanArray::from_iter(name, self.accumulators.into_iter()).into_series()) | ||
| } else { | ||
| Ok(BooleanArray::from_values( | ||
| name, | ||
| self.accumulators.into_iter().map(|opt| opt.unwrap()), | ||
| ) | ||
| .into_series()) | ||
| } | ||
| } | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| define_bool_and_accum!(BoolAndAccum); | ||
| define_bool_or_accum!(BoolOrAccum); | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // AggAccumulator enum — eliminates vtable dispatch in the hot loop | ||
| // --------------------------------------------------------------------------- |
There was a problem hiding this comment.
Near-identical macros can be unified
define_bool_and_accum! and define_bool_or_accum! are structurally identical — the only difference is && vs || on the accumulation step. A single define_bool_logic_accum!($name:ident, $op:tt) macro would eliminate the duplication and make a future BoolXor or similar operator a one-liner. As written, a bug fixed in one macro (e.g. in finalize or update_batch) must be replicated manually in the other.
Rule Used: Prefer single parametrized functions over multiple... (source)
Learned From
Eventual-Inc/Daft#5207
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
kinda leaning towards keeping them separate for readability.
| let fallback_result = rb.agg_groupby_fallback(&bound_agg, &group_by).unwrap(); | ||
| assert_batches_equal(&inline_result, &fallback_result); | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing
BoolOr partial-nulls test
The test suite has test_inline_bool_and_with_nulls_matches_fallback exercising the null-handling branch for BoolAnd, but no corresponding test for BoolOr with a mix of null and non-null values. test_inline_all_null_bool_or_matches_fallback only covers the all-null case. Since the null branch in BoolOrAccum::update_batch is a verbatim copy with || substituted for &&, a one-line regression there would go undetected.
Addresses Greptile's P2 on Eventual-Inc#6984: BoolAnd has test_inline_bool_and_with_nulls_matches_fallback exercising the null-handling branch, but BoolOr only had an all-null test (test_inline_all_null_bool_or_matches_fallback). The BoolOrAccum::update_batch null branch is a verbatim copy with `||` substituted for `&&`, so a one-line regression there would go undetected. Adds test_inline_bool_or_with_nulls_matches_fallback, mirroring the BoolAnd-with-nulls test, reusing the existing make_bool_val_with_nulls_test_batch helper. 38 inline_agg tests pass (37 prior + 1 new).
|
@greptile re-review |
Summary
Implements BoolAnd and BoolOr accumulators from #6585 (item 7) for the inline grouped aggregation path. Each accumulator holds a per-group
Option<bool>state; first non-null value seeds the state, subsequent non-null values combine via&&(BoolAnd) or||(BoolOr). Output dtype is Boolean. Grouping semantics and final query results are unchanged.Why
AggExpr::BoolAndandAggExpr::BoolOralready exist in the DSL and are wired in the fallback path (src/daft-recordbatch/src/lib.rs→Series::bool_and(groups)/Series::bool_or(groups)), but currently fall back tomake_groups + eval_agg_expressioneven when the rest of the query qualifies for the inline path. Adding them to the inline accumulator framework completes inline coverage of the standard reducer-style aggregates (Count / Sum / Min / Max / Product / BoolAnd / BoolOr) that all share the sameVec<Option<T>>per-group state shape.Changes Made
src/daft-recordbatch/src/ops/inline_agg.rs:define_bool_and_accum!anddefine_bool_or_accum!macros (kept separate per the Sum/Product precedent — these are semantically distinct ops with different identity and absorbing elements).define_agg_accumulator_enum!extended withBoolAndandBoolOrvariants.try_create_accumulatordispatchesAggExpr::BoolAnd(expr)andAggExpr::BoolOr(expr)onDataType::Boolean.can_inline_aggadds a separate Boolean-only dtype arm; existing numeric arm for Sum/Min/Max/Product is unchanged.Implementation note:
BooleanArray::values()doesn't expose a&[bool]slice because Arrow stores bools bit-packed. The null-free tight loop usesself.source.to_bitmap()+bitmap.value(row_idx)instead of the.zip(values().iter())pattern Sum/Product use over primitive slices. Functionally equivalent, just a different access pattern forced by the storage layout.Behavior
BoolAnd/BoolOrover Boolean columns now take the inline path instead of falling back tomake_groups + eval_agg_expression.false/ BoolOr hitstrue). Adding a per-row branch to the hot loop would regress non-short-circuiting groups; Sum/Min/Max have analogous opportunities and intentionally don't take them. Revisit if benchmarks show it matters.Test Plan
cargo test -p daft-recordbatch --release inline_agg— 37 passed (32 pre-existing + 5 new).cargo fmt -p daft-recordbatch --check— clean.cargo clippy -p daft-recordbatch --release --features python— clean, no#[allow]s added.New test cases:
test_inline_bool_and_matches_fallback— Utf8 keys + Boolean vals (no-null tight loop).test_inline_bool_or_matches_fallback— Utf8 keys + Boolean vals (no-null tight loop, OR semantics).test_inline_int_key_bool_and_matches_fallback— Int64 keys + Boolean vals (FNV int-key fast path).test_inline_bool_and_with_nulls_matches_fallback— Boolean vals withNoneinterspersed (exercises null-value branch).test_inline_all_null_bool_or_matches_fallback— all-null vals (exercises emptyOption<bool>finalize path).Related Issues