Conversation
788d368 to
2cb906f
Compare
When multiple CDK-basked captures run tasks concurrently, the synchronous Task.checkpoint() method blocks the entire asyncio event loop while writing buffered documents to stdout. This means when one task flushes its buffer, all other tasks are blocked from fetching data, processing records, or building up their own document buffers. We can make `Task.checkpoint` async in order to make flushing a buffer non-blocking to other tasks. Blocking I/O when flushing a buffer is now run with asyncio.to_thread, freeing the main event loop to run other coroutines while a buffer is being flushed. A module-level asyncio.Lock is used to serialize emissions to stdout. This ensures only a single task flushes its buffer at a time, preveing data from multiple buffers from being interleaved or corrupted while being written to stdout. With this change, while one task holds the lock and emits its buffer, other tasks can continue fetching data from the API, process/parse responses, and capture documents to build up their own buffers. When the emitting task releases the lock, another waiting task can begin flushing its buffer. This commit makes `Task.checkpoint()` async, which is a breaking change. All connectors calling `Task.checkpoint` must now await it. The connectors in the `connectors` repo affected by this are updated in this commit as well.
2cb906f to
0ed92a6
Compare
JustinASmith
approved these changes
Jan 14, 2026
Contributor
JustinASmith
left a comment
There was a problem hiding this comment.
This looks good to me and aligns with the discussions on Slack! 🚀
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description:
When multiple tasks run concurrently in a CDK-based capture, the synchronous
Task.checkpoint()method blocks the entireasyncioevent loop while writing buffered documents to stdout. This means when one task flushes a large buffer, all other tasks are blocked from fetching data, processing rows, or building up their own document buffers.We can make
Task.checkpointasync in order to improve concurrency. Blocking I/O when flushing a buffer is run withasyncio.to_thread, freeing the main event loop to run other coroutines while a buffer is being flushed. A module-levelasyncio.Lockis used to serialize emissions to stdout. This prevents documents and checkpoints from being interleaved or corrupted between tasks trying to flush their buffers.With this change, while one task holds the lock and emits its buffer, other tasks can continue fetching data from the API, process/parse responses, and capture documents to build up their own buffers. When the emitting task releases the lock, another waiting task can begin flushing its buffer.
This commit makes
Task.checkpoint()async, which is a breaking change. All connectors callingTask.checkpointmust now await it. The connectors in theconnectorsrepo affected by this are updated in this commit as well.Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
Tested on a local stack. Confirmed
Task.checkpointno longer blocks theasyncioevent loop.Note: I also made
BaseCaptureConnector.checkpointandBaseCaptureConnector._emitasync as well and made them use the sameasyncio.Lockto ensure they don't emit tostdoutat the same time as anyTasks. Right now, that won't ever happen becauseBaseCaptureConnector._emitis only called beforeTaskcreation, but it's a good future-proofing idea to ensurestdoutis behind a lock no matter who's writing to it.