Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions columnar/src/block_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,49 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
}
}

/// Like `fetch_block_with_missing`, but deduplicates (doc_id, value) pairs
/// so that each unique value per document is returned only once.
///
/// This is necessary for correct document counting in aggregations,
/// where multi-valued fields can produce duplicate entries that inflate counts.
#[inline]
pub fn fetch_block_with_missing_unique_per_doc(
&mut self,
docs: &[u32],
accessor: &Column<T>,
missing: Option<T>,
) {
self.fetch_block_with_missing(docs, accessor, missing);
if !accessor.index.get_cardinality().is_full() {
Copy link
Collaborator

@PSeitz PSeitz Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only necessary to deduplicate for multivalue cardinality

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, Thanks for the review

self.dedup_docid_val_pairs();
}
}

/// Removes consecutive duplicate (doc_id, value) pairs from the caches.
///
/// After `fetch_block`, entries for the same doc are adjacent, so duplicates
/// (same doc, same value) are consecutive and can be removed in O(n).
fn dedup_docid_val_pairs(&mut self) {
if self.docid_cache.len() <= 1 {
return;
}
let mut write = 0;
for read in 1..self.docid_cache.len() {
if self.docid_cache[read] != self.docid_cache[write]
|| self.val_cache[read] != self.val_cache[write]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check only for duplicate docids, not for duplicate values?

Can you extend the tests to capture this?

Copy link
Collaborator

@PSeitz PSeitz Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nevermind, we need to check both, so that termid only filters duplicate values on the same docid, but still handles multi-values

Copy link
Collaborator

@PSeitz PSeitz Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not cover pairs if the values are not consecutive, e.g. :

(0, 1), (0, 2), (0, 1)

It's a bit more expensive. I think we could

  • make groups of consecutive docids (just start and end pos)
  • if the group contains more than 2 elements sort by value
  • then do the same algorithm as now

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Values within each doc_id group are now sorted before deduplicating, and added unit tests.

{
write += 1;
if write != read {
self.docid_cache[write] = self.docid_cache[read];
self.val_cache[write] = self.val_cache[read];
}
}
}
let new_len = write + 1;
self.docid_cache.truncate(new_len);
self.val_cache.truncate(new_len);
}

#[inline]
pub fn iter_vals(&self) -> impl Iterator<Item = T> + '_ {
self.val_cache.iter().cloned()
Expand Down
18 changes: 10 additions & 8 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,11 +807,13 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector

let req_data = &mut self.terms_req_data;

agg_data.column_block_accessor.fetch_block_with_missing(
docs,
&req_data.accessor,
req_data.missing_value_for_accessor,
);
agg_data
.column_block_accessor
.fetch_block_with_missing_unique_per_doc(
docs,
&req_data.accessor,
req_data.missing_value_for_accessor,
);

if let Some(sub_agg) = &mut self.sub_agg {
let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize];
Expand Down Expand Up @@ -2347,7 +2349,7 @@ mod tests {

// text field
assert_eq!(res["my_texts"]["buckets"][0]["key"], "Hello Hello");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 5);
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "Empty");
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 2);
assert_eq!(
Expand All @@ -2356,7 +2358,7 @@ mod tests {
);
// text field with number as missing fallback
assert_eq!(res["my_texts2"]["buckets"][0]["key"], "Hello Hello");
assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 5);
assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 4);
assert_eq!(res["my_texts2"]["buckets"][1]["key"], 1337.0);
assert_eq!(res["my_texts2"]["buckets"][1]["doc_count"], 2);
assert_eq!(
Expand All @@ -2370,7 +2372,7 @@ mod tests {
assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0);
assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 4);
assert_eq!(res["my_ids"]["buckets"][1]["key"], 1.0);
assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 3);
assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 2);
assert_eq!(res["my_ids"]["buckets"][2]["key"], serde_json::Value::Null);

Ok(())
Expand Down
Loading