Skip to content

Commit 711ce9a

Browse files
authored
fix: Deadlock in new-streaming multiplexer (#21963)
1 parent 3d6752b commit 711ce9a

File tree

1 file changed

+10
-7
lines changed

1 file changed

+10
-7
lines changed

crates/polars-stream/src/nodes/multiplexer.rs

+10-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::collections::VecDeque;
33
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
44

55
use super::compute_node_prelude::*;
6+
use crate::async_primitives::wait_group::WaitGroup;
67
use crate::morsel::SourceToken;
78

89
// TODO: replace this with an out-of-core buffering solution.
@@ -138,9 +139,10 @@ impl ComputeNode for MultiplexerNode {
138139
let buffered_source_token = buffered_source_token.clone();
139140
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
140141
loop {
141-
let Ok(morsel) = receiver.recv().await else {
142+
let Ok(mut morsel) = receiver.recv().await else {
142143
break;
143144
};
145+
drop(morsel.take_consume_token());
144146

145147
let mut anyone_interested = false;
146148
let mut active_listener_interested = false;
@@ -154,11 +156,7 @@ impl ComputeNode for MultiplexerNode {
154156
Err(_) => *buf_sender = Listener::Inactive,
155157
},
156158
Listener::Buffering(b) => {
157-
// Make sure to count buffered morsels as
158-
// consumed to not block the source.
159-
let mut m = morsel.clone();
160-
m.take_consume_token();
161-
b.push_front(m);
159+
b.push_front(morsel.clone());
162160
anyone_interested = true;
163161
},
164162
Listener::Inactive => {},
@@ -185,23 +183,28 @@ impl ComputeNode for MultiplexerNode {
185183
if let Some((buf, mut rx)) = opt_buf_recv {
186184
let mut sender = send_port.take().unwrap().serial();
187185

186+
let wait_group = WaitGroup::default();
188187
let buffered_source_token = buffered_source_token.clone();
189188
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
190189
// First we try to flush all the old buffered data.
191190
while let Some(mut morsel) = buf.pop_back() {
192191
morsel.replace_source_token(buffered_source_token.clone());
192+
morsel.set_consume_token(wait_group.token());
193193
if sender.send(morsel).await.is_err()
194194
|| buffered_source_token.stop_requested()
195195
{
196196
break;
197197
}
198+
wait_group.wait().await;
198199
}
199200

200201
// Then send along data from the multiplexer.
201-
while let Some(morsel) = rx.recv().await {
202+
while let Some(mut morsel) = rx.recv().await {
203+
morsel.set_consume_token(wait_group.token());
202204
if sender.send(morsel).await.is_err() {
203205
break;
204206
}
207+
wait_group.wait().await;
205208
}
206209
Ok(())
207210
}));

0 commit comments

Comments
 (0)