Skip to content

Commit 9579bea

Browse files
google-genai-botcopybara-github
authored andcommitted
feat(plugins): Add flush mechanism to BigQueryAgentAnalyticsPlugin
This change introduces a `flush` method to the `BigQueryAgentAnalyticsPlugin`. This ensures that all pending log events are written to BigQuery before the agent's run completes. Key changes: - Added `flush()` method to `BigQueryAgentAnalyticsPlugin` to force write of pending events. PiperOrigin-RevId: 859263853
1 parent 3e3566b commit 9579bea

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,13 @@ def __init__(
699699
self._batch_processor_task: Optional[asyncio.Task] = None
700700
self._shutdown = False
701701

702+
async def flush(self) -> None:
703+
"""Flushes the queue by waiting for it to be empty."""
704+
if self._queue.empty():
705+
return
706+
# Wait for all items in the queue to be processed
707+
await self._queue.join()
708+
702709
async def start(self):
703710
"""Starts the batch writer worker task."""
704711
if self._batch_processor_task is None:
@@ -1516,6 +1523,11 @@ def _format_content_safely(
15161523
logger.warning("Content formatter failed: %s", e)
15171524
return "[FORMATTING FAILED]", False
15181525

1526+
async def flush(self) -> None:
1527+
"""Flushes any pending events to BigQuery."""
1528+
if self.batch_processor:
1529+
await self.batch_processor.flush()
1530+
15191531
async def _lazy_setup(self, **kwargs) -> None:
15201532
"""Performs lazy initialization of BigQuery clients and resources."""
15211533
if self._started:
@@ -1947,6 +1959,8 @@ async def after_run_callback(
19471959
await self._log_event(
19481960
"INVOCATION_COMPLETED", CallbackContext(invocation_context)
19491961
)
1962+
# Ensure all logs are flushed before the agent returns
1963+
await self.flush()
19501964

19511965
async def before_agent_callback(
19521966
self, *, agent: Any, callback_context: CallbackContext, **kwargs

tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2061,3 +2061,28 @@ async def test_otel_integration_real_provider(self, callback_context):
20612061
assert finished_spans[0].name == "test_span"
20622062
assert format(finished_spans[0].context.span_id, "016x") == span_id
20632063
assert format(finished_spans[0].context.trace_id, "032x") == trace_id
2064+
2065+
@pytest.mark.asyncio
2066+
async def test_flush_mechanism(
2067+
self,
2068+
bq_plugin_inst,
2069+
mock_write_client,
2070+
dummy_arrow_schema,
2071+
invocation_context,
2072+
):
2073+
"""Verifies that flush() forces pending events to be written."""
2074+
# Log an event
2075+
bigquery_agent_analytics_plugin.TraceManager.push_span(invocation_context)
2076+
await bq_plugin_inst.before_run_callback(
2077+
invocation_context=invocation_context
2078+
)
2079+
2080+
# Call flush - this should block until the event is written
2081+
await bq_plugin_inst.flush()
2082+
2083+
# Verify write called
2084+
mock_write_client.append_rows.assert_called_once()
2085+
log_entry = await _get_captured_event_dict_async(
2086+
mock_write_client, dummy_arrow_schema
2087+
)
2088+
assert log_entry["event_type"] == "INVOCATION_STARTING"

0 commit comments

Comments
 (0)