Skip to content

Commit 4effb4b

Browse files
committed
fix: per-stage CLI runner deadlocked on output larger than the pipe buffer (issue #4)
The per-stage DuckDB runner spawned the CLI with stdout/stderr piped, then polled try_wait() to completion and only read the output via wait_with_output() AFTER the process exited. A child process's output pipe has a fixed OS buffer (~64 KiB on Windows); once the CLI's result exceeded it, DuckDB blocked writing stdout while the engine blocked waiting for exit. Permanent deadlock. This bit the per-stage path's node preview: `SELECT * FROM <node> LIMIT 100`. For a wide table (a 36-column Oracle date dimension serialized to ~128 KiB of JSON) it hung every run, on the SOURCE node's preview, before the sink stage ever executed. Symptoms matched exactly: the source row count was correct (COUNT(*) is tiny, well under the buffer), the sink produced no file (never reached), and the temp DB was left on disk (the run never reached cleanup). SQL Server "worked" only because that table was narrow enough to stay under the buffer - it was table-width-specific, not Oracle-specific. The batched (pure-SQL) path was immune because it already drains stdout on a thread. The runner now drains stdout and stderr on dedicated threads while the process runs, so any result size completes. Cancellation still kills the child and joins the readers. Adds a regression test: a wide CSV whose 100-row preview exceeds the pipe buffer, forced onto the per-stage path, must complete.
1 parent cd68f84 commit 4effb4b

2 files changed

Lines changed: 95 additions & 10 deletions

File tree

crates/duckdb-engine/src/lib.rs

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,35 +165,68 @@ impl DuckdbEngine {
165165
.spawn()
166166
.map_err(|e| EngineError::Other(format!("could not start duckdb: {}", e)))?;
167167

168-
loop {
168+
// Drain stdout AND stderr on dedicated threads so the child can
169+
// never deadlock against a full OS pipe buffer. The previous code
170+
// polled try_wait() to completion and only called wait_with_output()
171+
// *after* the process exited - but a Windows anonymous pipe holds
172+
// only ~64 KiB, so once DuckDB's result exceeds that it blocks
173+
// writing stdout while we block waiting for it to exit. A wide-table
174+
// preview (`SELECT * ... LIMIT 100` over ~36 columns is ~128 KiB)
175+
// hit this exactly, hanging the whole pipeline on the source node's
176+
// preview before it ever reached the sink (issue #4). Concurrent
177+
// readers keep the pipe drained regardless of result size.
178+
use std::io::Read;
179+
let mut stdout_pipe = child
180+
.stdout
181+
.take()
182+
.ok_or_else(|| EngineError::Other("duckdb stdout not captured".into()))?;
183+
let mut stderr_pipe = child
184+
.stderr
185+
.take()
186+
.ok_or_else(|| EngineError::Other("duckdb stderr not captured".into()))?;
187+
let stdout_reader = std::thread::spawn(move || {
188+
let mut buf = Vec::new();
189+
let _ = stdout_pipe.read_to_end(&mut buf);
190+
buf
191+
});
192+
let stderr_reader = std::thread::spawn(move || {
193+
let mut buf = Vec::new();
194+
let _ = stderr_pipe.read_to_end(&mut buf);
195+
buf
196+
});
197+
198+
let status = loop {
169199
match child.try_wait() {
170-
Ok(Some(_)) => break,
200+
Ok(Some(s)) => break s,
171201
Ok(None) => {
172202
if self.cancel.load(Ordering::Relaxed) {
173203
let _ = child.kill();
174204
let _ = child.wait();
205+
// Killing closes the pipes, so the reader threads
206+
// unblock; join them so their handles are released.
207+
let _ = stdout_reader.join();
208+
let _ = stderr_reader.join();
175209
return Err(EngineError::Cancelled);
176210
}
177211
std::thread::sleep(std::time::Duration::from_millis(40));
178212
}
179213
Err(e) => return Err(EngineError::Other(e.to_string())),
180214
}
181-
}
215+
};
182216

183-
let out = child
184-
.wait_with_output()
185-
.map_err(|e| EngineError::Other(e.to_string()))?;
186-
if !out.status.success() {
187-
let mut msg = String::from_utf8_lossy(&out.stderr).trim().to_string();
217+
let stdout_bytes = stdout_reader.join().unwrap_or_default();
218+
let stderr_bytes = stderr_reader.join().unwrap_or_default();
219+
if !status.success() {
220+
let mut msg = String::from_utf8_lossy(&stderr_bytes).trim().to_string();
188221
if msg.is_empty() {
189-
msg = String::from_utf8_lossy(&out.stdout).trim().to_string();
222+
msg = String::from_utf8_lossy(&stdout_bytes).trim().to_string();
190223
}
191224
if msg.is_empty() {
192225
msg = "DuckDB CLI exited with an error".into();
193226
}
194227
return Err(EngineError::Query(msg));
195228
}
196-
Ok(String::from_utf8_lossy(&out.stdout).into_owned())
229+
Ok(String::from_utf8_lossy(&stdout_bytes).into_owned())
197230
}
198231

199232
/// Run SQL and return the first JSON array of rows it printed

crates/duckdb-engine/tests/execution.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,58 @@ fn csv_to_csv_roundtrip_preserves_rows() {
237237
assert_eq!(count(&format!("read_csv_auto('{}')", out)), 3);
238238
}
239239

240+
#[test]
241+
fn per_stage_wide_preview_does_not_deadlock() {
242+
// Regression for issue #4: the per-stage CLI runner buffered stdout
243+
// in the OS pipe and only read it after the process exited. A wide
244+
// node preview (`SELECT * ... LIMIT 100`) whose JSON exceeds the
245+
// ~64 KiB Windows pipe buffer deadlocked - DuckDB blocked writing
246+
// stdout while the engine blocked waiting for exit - hanging the
247+
// whole pipeline on the source node's preview, before the sink ever
248+
// ran. (An Oracle date-dimension with 36 columns produced a ~128 KiB
249+
// preview and hit this every time.) The runner now drains stdout +
250+
// stderr concurrently, so any result size completes.
251+
//
252+
// Reproduced here without a driver source: a wide CSV (its 100-row
253+
// preview is ~150 KiB) plus memoryLimitMb on a node, which forces
254+
// the per-stage path (the batched path drains on a thread already).
255+
let tmp = tempfile::tempdir().unwrap();
256+
let cols = 8usize;
257+
let rows = 200usize;
258+
let cell = "x".repeat(200); // 200-char cells -> ~1.6 KiB/row
259+
let mut csv = String::new();
260+
csv.push_str(
261+
&(0..cols)
262+
.map(|c| format!("c{}", c))
263+
.collect::<Vec<_>>()
264+
.join(","),
265+
);
266+
csv.push('\n');
267+
for _ in 0..rows {
268+
csv.push_str(
269+
&(0..cols).map(|_| cell.as_str()).collect::<Vec<_>>().join(","),
270+
);
271+
csv.push('\n');
272+
}
273+
let in_path = write_file(tmp.path(), "wide.csv", &csv);
274+
let out = out_path(tmp.path(), "wide_out.csv");
275+
276+
let engine = engine_or_skip!();
277+
let d = doc(
278+
json!([
279+
node("s1", "src.csv", json!({ "path": in_path, "hasHeader": true })),
280+
// memoryLimitMb forces the per-stage path (where the buggy
281+
// runner lived); the value itself is irrelevant to the test.
282+
node("k1", "snk.csv", json!({ "path": out, "hasHeader": true, "memoryLimitMb": 512 })),
283+
]),
284+
json!([main_edge("e1", "s1", "k1")]),
285+
);
286+
let result = engine.execute_pipeline(&d);
287+
assert_eq!(result.status, "ok", "wide per-stage run failed/hung: {:?}", result.error);
288+
assert!(Path::new(&out).exists());
289+
assert_eq!(count(&format!("read_csv_auto('{}')", out)), rows as i64);
290+
}
291+
240292
#[test]
241293
fn aggregate_groups_and_sums() {
242294
let tmp = tempfile::tempdir().unwrap();

0 commit comments

Comments
 (0)