Skip to content

[Bug] apply_sync_streaming swallows exceptions #9142

@max-muoto

Description

@max-muoto

What happened?

When using streamify with async_streaming=False, exceptions raised in the async generator end up silently swallowed due to how apply_sync_streaming is implemented:

def apply_sync_streaming(async_generator: AsyncGenerator) -> Generator:
    """Convert the async streaming generator to a sync generator."""
    queue = Queue()  # Queue to hold items from the async generator
    stop_sentinel = object()  # Sentinel to signal the generator is complete

    # To propagate prediction request ID context to the child thread
    context = contextvars.copy_context()

    def producer():
        """Runs in a background thread to fetch items asynchronously."""

        async def runner():
            try:
                async for item in async_generator:
                    queue.put(item)
            finally:
                # Signal completion
                queue.put(stop_sentinel)

        context.run(asyncio.run, runner())

    # Start the producer in a background thread
    thread = threading.Thread(target=producer, daemon=True)
    thread.start()

    # Consume items from the queue
    while True:
        item = queue.get()  # Block until an item is available
        if item is stop_sentinel:
            break
        yield item

Instead of explicitly catching exceptions and putting them in the queue to propagate to the caller, exceptions simply lead to the stop sentinel being sent..

DSPy version

3.0.4

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions