Skip to content

Fix silent streaming error handling#664

Open
utay wants to merge 1 commit intoduckdb:mainfrom
altertable-ai:yu/streaming-error
Open

Fix silent streaming error handling#664
utay wants to merge 1 commit intoduckdb:mainfrom
altertable-ai:yu/streaming-error

Conversation

@utay
Copy link
Copy Markdown

@utay utay commented Jan 20, 2026

When duckdb_stream_fetch_chunk returns null, it could indicate either end-of-stream or an error, but the code previously returned None in both cases, silently swallowing errors. This PR checks duckdb_result_error to distinguish between the two and returns Result<Option<...>> so callers can handle streaming failures.

Copy link
Copy Markdown
Member

@mlafeldt mlafeldt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Good fix and worth the breaking change. 👍

Left some comments for you to address before merging.

let schema = FFI_ArrowSchema::try_from(schema.deref()).ok()?;
let schema = FFI_ArrowSchema::try_from(schema.deref())
.map_err(|e| Error::DuckDBFailure(ffi::Error::new(ffi::DuckDBError), Some(e.to_string())))?;
let array_data = from_ffi(arrays, &schema).expect("ok");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that streaming_step returns a Result, we should replace .expect with .map_err(|e| ...)? too.

Ok(None) => None,
Err(e) => {
// Clear the statement to prevent further iteration after error
self.stmt = None;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test verifies iterator yields None here. Please add assert!(stream.next().is_none()).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have: impl FusedIterator for ArrowStream<'_> {} to document this behavior

let stream = stmt.stream_arrow([], schema)?;

let mut ok_batches = 0usize;
let mut stream = stream;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this and make the stream above mut.

assert!(ok_batches > 0, "Expected at least one batch before error");
let msg = format!("{e}");
assert!(
msg == "INTERRUPT Error: Interrupted!",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg.contains("INTERRUPT") seems less brittle, but your call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants