Skip to content

Commit 86a4a4c

Browse files
authored
fix(rust, python): early error on duplicate names in streaming groupby (#5638)
1 parent cc83dd2 commit 86a4a4c

File tree

4 files changed

+38
-6
lines changed

4 files changed

+38
-6
lines changed

polars/polars-core/src/frame/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pub struct DataFrame {
136136
pub(crate) columns: Vec<Series>,
137137
}
138138

139-
fn duplicate_err(name: &str) -> PolarsResult<()> {
139+
pub fn _duplicate_err(name: &str) -> PolarsResult<()> {
140140
Err(PolarsError::Duplicate(
141141
format!("Column with name: '{}' has more than one occurrences", name).into(),
142142
))
@@ -254,7 +254,7 @@ impl DataFrame {
254254
let name = s.name();
255255

256256
if names.contains(name) {
257-
duplicate_err(name)?
257+
_duplicate_err(name)?
258258
}
259259

260260
names.insert(name);
@@ -282,7 +282,7 @@ impl DataFrame {
282282
let name = series.name().to_string();
283283

284284
if names.contains(&name) {
285-
duplicate_err(&name)?
285+
_duplicate_err(&name)?
286286
}
287287

288288
series_cols.push(series);
@@ -1425,7 +1425,7 @@ impl DataFrame {
14251425
let mut names = PlHashSet::with_capacity(cols.len());
14261426
for name in cols {
14271427
if !names.insert(name.as_str()) {
1428-
duplicate_err(name)?
1428+
_duplicate_err(name)?
14291429
}
14301430
}
14311431
}

polars/polars-lazy/polars-plan/src/logical_plan/builder.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::ops::Deref;
33
use std::path::PathBuf;
44
use std::sync::Mutex;
55

6+
use polars_core::frame::_duplicate_err;
67
use polars_core::frame::explode::MeltArgs;
78
use polars_core::prelude::*;
89
use polars_core::utils::try_get_supertype;
@@ -426,6 +427,20 @@ impl LogicalPlanBuilder {
426427
);
427428
schema.merge(other);
428429

430+
if schema.len() < keys.len() + aggs.len() {
431+
let check_names = || {
432+
let mut names = PlHashSet::with_capacity(schema.len());
433+
for expr in aggs.iter().chain(keys.iter()) {
434+
let name = expr_output_name(expr)?;
435+
if !names.insert(name.clone()) {
436+
return _duplicate_err(name.as_ref());
437+
}
438+
}
439+
Ok(())
440+
};
441+
try_delayed!(check_names(), &self.0, into)
442+
}
443+
429444
#[cfg(feature = "dynamic_groupby")]
430445
{
431446
let index_columns = &[

polars/polars-lazy/polars-plan/src/utils.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use polars_core::prelude::*;
66

77
use crate::logical_plan::iterator::ArenaExprIter;
88
use crate::logical_plan::Context;
9-
#[cfg(feature = "meta")]
109
use crate::prelude::names::COUNT;
1110
use crate::prelude::*;
1211

@@ -136,7 +135,6 @@ pub fn has_null(current_expr: &Expr) -> bool {
136135
}
137136

138137
/// output name of expr
139-
#[cfg(feature = "meta")]
140138
pub(crate) fn expr_output_name(expr: &Expr) -> PolarsResult<Arc<str>> {
141139
for e in expr {
142140
match e {

py-polars/tests/unit/test_streaming.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import date
22

33
import numpy as np
4+
import pytest
45

56
import polars as pl
67

@@ -71,6 +72,24 @@ def test_streaming_groupby_types() -> None:
7172
"date_last": [date(2022, 1, 1)],
7273
}
7374

75+
with pytest.raises(pl.DuplicateError):
76+
(
77+
df.lazy()
78+
.groupby("person_id")
79+
.agg(
80+
[
81+
pl.col("person_name").first().alias("str_first"),
82+
pl.col("person_name").last().alias("str_last"),
83+
pl.col("person_name").mean().alias("str_mean"),
84+
pl.col("person_name").sum().alias("str_sum"),
85+
pl.col("bool").first().alias("bool_first"),
86+
pl.col("bool").last().alias("bool_first"),
87+
]
88+
)
89+
.select(pl.all().exclude("person_id"))
90+
.collect(streaming=True)
91+
)
92+
7493

7594
def test_streaming_non_streaming_gb() -> None:
7695
n = 100

0 commit comments

Comments
 (0)