Skip to content

Panic: "Invalid input sender for input type" at input_sender.rs:32 in NativeRunner streaming (0.7.13 + 0.7.14) #7023

@TechyMT

Description

@TechyMT

Panic: Invalid input sender for input type at input_sender.rs:32

Summary

NativeRunner panics at src/daft-local-execution/src/input_sender.rs:32:18:

thread '<unnamed>' panicked at src/daft-local-execution/src/input_sender.rs:32:18:
internal error: entered unreachable code: Invalid input sender for input type

This is reached from daft/execution/native_executor.py:53 in stream_results while consuming df.to_arrow_iter() on a lazy DataFrame produced by read_parquet(... hive_partitioning=True, schema=...) → .where(col != literal) in a long-lived Python process. The only working code path is materializing the full result via df.to_arrow() and slicing it in Python.

Environment

  • daft: 0.7.13 — also reproduced on 0.7.14 (same panic, same line)
  • Python: 3.11.10
  • OS: macOS 14 (Darwin Kernel 24.6.0) arm64
  • Runner: NativeRunner (default; DAFT_RUNNER=py errors with ValueError: The PyRunner was removed from Daft from v0.5.0 onwards)
  • Set globally: daft.set_execution_config(scantask_max_parallel=1)

Panic (Python-side stack)

DaftError::External task <N> panicked with message
  "internal error: entered unreachable code: Invalid input sender for input type"

  File ".../daft/execution/native_executor.py", line 53, in stream_results
    yield batch
          └ None
GeneratorExit

daft.exceptions.DaftCoreException: DaftError::External task <N> panicked with message
  "internal error: entered unreachable code: Invalid input sender for input type"

The yield batch with batch=None in native_executor.py:53 looks like the streaming generator received a None it didn't expect.

Code shape that triggers it

df = daft.read_parquet(
    f"{prefix}/**/*.parquet",
    infer_schema=False,
    schema={...~15 fields, mostly string + a few int64/bool...},
    hive_partitioning=True,
)

publish_df = df.where(df["state"] != "DELETE")
delete_df  = df.where(df["state"] == "DELETE")

for batch in publish_df.to_arrow_iter():   # PANIC HERE
    # process the batch (build pydantic records, do work)
    # then write back: daft.from_pylist([...]).write_parquet(..., partition_cols=[...])
    ...

The DataFrame is consumed inside an async def running under asyncio.run(...), and the same Python process performs many sequential read_parquet / from_pylist / write_parquet(partition_cols=...) operations across the lifetime of the process.

Failed minimization

The panic is reliable in the full workload, but every standalone reduction I've tried streams successfully:

Variation Result
Synthetic 100K-row hive-partitioned dataset + where().to_arrow_iter() streams
Same shape with ~30-col schema + JSON-string payload streams
The actual real parquet file (copied byte-for-byte from the failing run) streams
Same as above wrapped in asyncio.run(main()) streams
Concurrent (two async tasks reading + iterating simultaneously) streams
All of the above with daft.set_execution_config(scantask_max_parallel=1) set streams

So the trigger is something specific to a long-lived process performing many daft operations — likely a combination of:

  • many sequential daft operations in the same Python process
  • DataFrames built from different sources (read_parquet + from_pylist + write_parquet(partition_cols=...)) in the same process
  • some global daft state I haven't identified

Happy to try further reductions if a maintainer has a hypothesis to test.

Workarounds tried

Attempt Outcome
df.into_batches(N).iter_partitions() (the 0.6.x streaming API) panics
df.into_batches(N).to_arrow_iter() panics
Bare df.to_arrow_iter() (no into_batches) panics
default_morsel_size=2048 matched to batch_size no effect, panics
DAFT_RUNNER=py ValueError: PyRunner removed in v0.5.0
Drop hive_partitioning=True, glob the specific partition_id=N/type_name=X/ path with a stripped schema panics
Remove all df.limit(1).count_rows() guards before iteration panics
arrow_table = df.to_arrow() + Python slicing the materialized table WORKS — the only path

Perf cost of the workaround

Materializing via to_arrow() (the only working path) serializes the consumer loop and costs ~2.3× wall-clock vs. the daft 0.6.x streaming baseline on the same ~80K-row workload (10 min → 22 min). Recovering streaming would be a large win.

Asks

  1. What inputs trigger the unreachable!() at input_sender.rs:32? A description of the InputSender enum variants and their expected inputs would help me narrow down what's being emitted that hits the panic.
  2. Is a fix planned for 0.7.x, or is this expected to be addressed by the broader streaming rework that the @daft.udf@daft.func/@daft.cls deprecation hints at in 0.8.x?
  3. Any debug knobs (RUST_BACKTRACE, daft logging flags) that would surface more diagnostic info from inside an async generator without rebuilding daft from source?

Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions