Skip to content

Commit 7b25b8f

Browse files
google-genai-botcopybara-github
authored andcommitted
fix(runner): Yield buffered function_call/function_response events during live streaming
Bug: In live streaming mode, when function_call and function_response events arrive during active transcription, they are correctly buffered but never yielded to the caller. This causes callers to miss these events even though they are saved to the session. Fix: Add yield buffered_event after appending buffered events to the session when transcription ends. Testing: - Added unit test: test_live_streaming_buffered_function_call_yielded_during_transcription - Test verifies buffered events are yielded by: 1. Simulating partial transcription (triggers buffering) 2. Sending function_call during transcription (gets buffered) 3. Ending transcription (should yield buffered events) 4. Asserting both function_call and function_response are in yielded events Test results: - With fix: PASSED - Without fix (yield commented out): FAILED with "Buffered function_call event was not yielded" - Example event flow after fix: EVENT: partial=True, input_transcription="Show me the weather" EVENT: function_call=get_weather, args={'location': 'NYC'} <- Now yielded EVENT: function_response=get_weather, response={...} <- Now yielded EVENT: partial=False, input_transcription="Show me the weather for today" PiperOrigin-RevId: 859158546
1 parent 910f654 commit 7b25b8f

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

src/google/adk/runners.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ async def _exec_with_plugin(
815815
await self.session_service.append_event(
816816
session=session, event=buffered_event
817817
)
818+
yield buffered_event # yield buffered events to caller
818819
buffered_events = []
819820
else:
820821
# non-transcription event or empty transcription event, for

tests/unittests/streaming/test_streaming.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,3 +1009,114 @@ async def consume_responses(session: testing_utils.Session):
10091009

10101010
assert stock_call_found, 'Expected monitor_stock_price function call event.'
10111011
assert video_call_found, 'Expected monitor_video_stream function call event.'
1012+
1013+
1014+
def test_live_streaming_buffered_function_call_yielded_during_transcription():
1015+
"""Test that function calls buffered during transcription are yielded.
1016+
1017+
This tests the fix for the bug where function_call and function_response
1018+
events were buffered during active transcription but never yielded to the
1019+
caller. The fix ensures buffered events are yielded after transcription ends.
1020+
"""
1021+
function_call = types.Part.from_function_call(
1022+
name='get_weather', args={'location': 'San Francisco'}
1023+
)
1024+
1025+
response1 = LlmResponse(
1026+
input_transcription=types.Transcription(text='Show'),
1027+
partial=True, # ← Triggers is_transcribing = True
1028+
)
1029+
response2 = LlmResponse(
1030+
content=types.Content(
1031+
role='model', parts=[function_call]
1032+
), # ← Gets buffered
1033+
turn_complete=False,
1034+
)
1035+
response3 = LlmResponse(
1036+
input_transcription=types.Transcription(text='Show me the weather'),
1037+
partial=False, # ← Transcription ends, buffered events yielded
1038+
)
1039+
response4 = LlmResponse(
1040+
turn_complete=True,
1041+
)
1042+
1043+
mock_model = testing_utils.MockModel.create(
1044+
[response1, response2, response3, response4]
1045+
)
1046+
1047+
def get_weather(location: str) -> dict:
1048+
return {'temperature': 22, 'location': location}
1049+
1050+
root_agent = Agent(
1051+
name='root_agent',
1052+
model=mock_model,
1053+
tools=[get_weather],
1054+
)
1055+
1056+
class CustomTestRunner(testing_utils.InMemoryRunner):
1057+
1058+
def run_live(
1059+
self,
1060+
live_request_queue: LiveRequestQueue,
1061+
run_config: testing_utils.RunConfig = None,
1062+
) -> list[testing_utils.Event]:
1063+
collected_responses = []
1064+
1065+
async def consume_responses(session: testing_utils.Session):
1066+
run_res = self.runner.run_live(
1067+
session=session,
1068+
live_request_queue=live_request_queue,
1069+
run_config=run_config or testing_utils.RunConfig(),
1070+
)
1071+
1072+
async for response in run_res:
1073+
collected_responses.append(response)
1074+
if len(collected_responses) >= 5:
1075+
return
1076+
1077+
try:
1078+
session = self.session
1079+
loop = asyncio.new_event_loop()
1080+
asyncio.set_event_loop(loop)
1081+
try:
1082+
loop.run_until_complete(
1083+
asyncio.wait_for(consume_responses(session), timeout=5.0)
1084+
)
1085+
finally:
1086+
loop.close()
1087+
except (asyncio.TimeoutError, asyncio.CancelledError):
1088+
pass
1089+
1090+
return collected_responses
1091+
1092+
runner = CustomTestRunner(root_agent=root_agent)
1093+
live_request_queue = LiveRequestQueue()
1094+
live_request_queue.send_realtime(
1095+
blob=types.Blob(data=b'Show me the weather', mime_type='audio/pcm')
1096+
)
1097+
1098+
res_events = runner.run_live(live_request_queue)
1099+
1100+
assert res_events is not None, 'Expected a list of events, got None.'
1101+
assert len(res_events) >= 1, 'Expected at least one event.'
1102+
1103+
function_call_found = False
1104+
function_response_found = False
1105+
1106+
for event in res_events:
1107+
if event.content and event.content.parts:
1108+
for part in event.content.parts:
1109+
if part.function_call and part.function_call.name == 'get_weather':
1110+
function_call_found = True
1111+
assert part.function_call.args['location'] == 'San Francisco'
1112+
if (
1113+
part.function_response
1114+
and part.function_response.name == 'get_weather'
1115+
):
1116+
function_response_found = True
1117+
assert part.function_response.response['temperature'] == 22
1118+
1119+
assert function_call_found, 'Buffered function_call event was not yielded.'
1120+
assert (
1121+
function_response_found
1122+
), 'Buffered function_response event was not yielded.'

0 commit comments

Comments
 (0)