Skip to content

Commit b9ea05a

Browse files
refactor(rust): Fix slicing 0-width morsels on new streaming (#21940)
1 parent 697b162 commit b9ea05a

File tree

6 files changed

+6
-6
lines changed

6 files changed

+6
-6
lines changed

crates/polars-stream/src/nodes/filter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl ComputeNode for FilterNode {
5858
df._filter_seq(mask)
5959
}).await?;
6060

61-
if morsel.df().is_empty() {
61+
if morsel.df().height() == 0 {
6262
continue;
6363
}
6464

crates/polars-stream/src/nodes/in_memory_source.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl ComputeNode for InMemorySourceNode {
9090

9191
// TODO: remove this 'always sent at least one morsel'
9292
// condition, see update_state.
93-
if df.is_empty() && seq > 0 {
93+
if df.height() == 0 && seq > 0 {
9494
break;
9595
}
9696

crates/polars-stream/src/nodes/io_sinks/partition/parted.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl SinkNode for PartedPartitionSinkNode {
150150
let mut recv_port = recv_port.serial();
151151
while let Ok(morsel) = recv_port.recv().await {
152152
let (mut df, seq, source_token, consume_token) = morsel.into_inner();
153-
if df.is_empty() {
153+
if df.height() == 0 {
154154
continue;
155155
}
156156

crates/polars-stream/src/nodes/io_sources/ndjson/negative_slice_pass.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl MorselStreamReverser {
195195
let mut df =
196196
combined_df.slice(row_offset.try_into().unwrap(), chunk_size);
197197

198-
assert!(!df.is_empty()); // If we did our calculations properly
198+
assert!(df.height() > 0); // If we did our calculations properly
199199

200200
if let Some(row_index) = row_index.clone() {
201201
let offset = row_index.offset.saturating_add(

crates/polars-stream/src/nodes/merge_sorted.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ fn find_mergeable(
5656
) -> PolarsResult<Option<(DataFrame, DataFrame)>> {
5757
fn first_non_empty(vd: &mut VecDeque<DataFrame>) -> Option<DataFrame> {
5858
let mut df = vd.pop_front()?;
59-
while df.is_empty() {
59+
while df.height() == 0 {
6060
df = vd.pop_front()?;
6161
}
6262
Some(df)

crates/polars-stream/src/nodes/streaming_slice.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl ComputeNode for StreamingSliceNode {
8181
morsel.source_token().stop();
8282
}
8383

84-
if !morsel.df().is_empty() && send.send(morsel).await.is_err() {
84+
if morsel.df().height() > 0 && send.send(morsel).await.is_err() {
8585
break;
8686
}
8787

0 commit comments

Comments
 (0)