Skip to content

Commit feb8cf9

Browse files
mesutoezdiljmhbh
andauthored
fix(bedrock): stream events incrementally instead of buffering (#1989)
Fixes #1986 `_run_converse_stream` used `list()` to collect all stream events before returning. This blocked the caller until the full response was ready. Replaced with an `asyncio.Queue` bridge so events are forwarded as they arrive from boto3. Signed-off-by: mesutoezdil <mesudozdil@gmail.com> Co-authored-by: J.M. Huibonhoa <jmhbh@users.noreply.github.com>
1 parent 32e7221 commit feb8cf9

1 file changed

Lines changed: 17 additions & 6 deletions

File tree

  • python/packages/kagent-adk/src/kagent/adk/models

python/packages/kagent-adk/src/kagent/adk/models/_bedrock.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,21 +318,32 @@ async def generate_content_async(
318318
if self.additional_model_request_fields:
319319
kwargs["additionalModelRequestFields"] = self.additional_model_request_fields
320320

321-
def _run_converse_stream(**kw):
322-
resp = client.converse_stream(**kw)
323-
return list(resp.get("stream", []))
324-
325321
try:
326322
if stream:
327-
stream_body = await asyncio.to_thread(_run_converse_stream, **kwargs)
323+
q: asyncio.Queue = asyncio.Queue()
324+
loop = asyncio.get_running_loop()
325+
326+
def _produce():
327+
try:
328+
resp = client.converse_stream(**kwargs)
329+
for event in resp.get("stream", []):
330+
loop.call_soon_threadsafe(q.put_nowait, event)
331+
except Exception as exc:
332+
loop.call_soon_threadsafe(q.put_nowait, exc)
333+
finally:
334+
loop.call_soon_threadsafe(q.put_nowait, None)
335+
336+
loop.run_in_executor(None, _produce)
328337

329338
aggregated_text = ""
330339
tool_uses: dict[str, dict] = {} # toolUseId -> {name, input_json}
331340
current_tool_id: Optional[str] = None
332341
stop_reason = "end_turn"
333342
usage_metadata: Optional[types.GenerateContentResponseUsageMetadata] = None
334343

335-
for event in stream_body:
344+
while (event := await q.get()) is not None:
345+
if isinstance(event, Exception):
346+
raise event
336347
if "contentBlockStart" in event:
337348
start = event["contentBlockStart"].get("start", {})
338349
if "toolUse" in start:

0 commit comments

Comments
 (0)