Skip to content

Commit 436c772

Browse files
committed
Make code async
1 parent 8d82fd0 commit 436c772

File tree

3 files changed

+95
-46
lines changed

3 files changed

+95
-46
lines changed

src/galileo/decorator.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,24 @@ def start_session(
930930
name=name, previous_session_id=previous_session_id, external_id=external_id
931931
)
932932

933+
async def async_start_session(
934+
self, name: Optional[str] = None, previous_session_id: Optional[str] = None, external_id: Optional[str] = None
935+
) -> str:
936+
"""
937+
Async start a session in the active context logger instance.
938+
939+
Args:
940+
name: The name of the session. If not provided, a session name will be generated automatically.
941+
previous_session_id: The id of the previous session. Defaults to None.
942+
external_id: The external id of the session. Defaults to None.
943+
944+
Returns:
945+
str: The id of the newly created session.
946+
"""
947+
return await self.get_logger_instance().async_start_session(
948+
name=name, previous_session_id=previous_session_id, external_id=external_id
949+
)
950+
933951
def clear_session(self) -> None:
934952
"""Clear the session in the active context logger instance."""
935953
self.get_logger_instance().clear_session()

src/galileo/logger/logger.py

Lines changed: 37 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,22 +1016,8 @@ def conclude(
10161016

10171017
return current_parent
10181018

1019-
@nop_sync
1020-
def flush(self) -> list[Trace]:
1021-
"""
1022-
Upload all traces to Galileo.
1023-
1024-
Returns:
1025-
-------
1026-
List[Trace]: The list of uploaded traces.
1027-
"""
1028-
if self.mode == "batch":
1029-
return self._flush_batch()
1030-
else:
1031-
self._logger.warning("Flushing in streaming mode is not supported.")
1032-
return list()
1033-
1034-
def _flush_batch(self):
1019+
async def _flush_batch(self, is_async: bool = False) -> list[Trace]:
1020+
# import pdb; pdb.set_trace()
10351021
if not self.traces:
10361022
self._logger.info("No traces to flush.")
10371023
return list()
@@ -1044,7 +1030,7 @@ def _flush_batch(self):
10441030

10451031
if self.local_metrics:
10461032
self._logger.info("Computing local metrics...")
1047-
# TODO: parallelize, possibly with ThreadPoolExecutor
1033+
# TODO: parallelize, possibly with ThreadPoolExecutor/asyncio
10481034
for trace in self.traces:
10491035
populate_local_metrics(trace, self.local_metrics)
10501036

@@ -1053,7 +1039,12 @@ def _flush_batch(self):
10531039
traces_ingest_request = TracesIngestRequest(
10541040
traces=self.traces, experiment_id=self.experiment_id, session_id=self.session_id
10551041
)
1056-
self._client.ingest_traces_sync(traces_ingest_request)
1042+
1043+
if is_async:
1044+
await self._client.ingest_traces(traces_ingest_request)
1045+
else:
1046+
self._client.ingest_traces_sync(traces_ingest_request)
1047+
10571048
logged_traces = self.traces
10581049

10591050
self._logger.info("Successfully flushed %d traces.", len(logged_traces))
@@ -1072,39 +1063,39 @@ async def async_flush(self) -> list[Trace]:
10721063
List[Trace]: The list of uploaded workflows.
10731064
"""
10741065
if self.mode == "batch":
1075-
return await self._async_flush_batch()
1066+
return await self._flush_batch(is_async=True)
10761067
else:
10771068
self._logger.warning("Flushing in streaming mode is not supported.")
10781069
return list()
10791070

1080-
async def _async_flush_batch(self) -> list[Trace]:
1081-
if not self.traces:
1082-
self._logger.info("No traces to flush.")
1083-
return list()
1084-
1085-
current_parent = self.current_parent()
1086-
if current_parent is not None:
1087-
self._logger.info("Concluding the active trace...")
1088-
last_output = get_last_output(current_parent)
1089-
self.conclude(output=last_output, conclude_all=True)
1090-
1091-
if self.local_metrics:
1092-
self._logger.info("Computing metrics for local scorers...")
1093-
# TODO: parallelize, possibly with asyncio to_thread/gather
1094-
for trace in self.traces:
1095-
populate_local_metrics(trace, self.local_metrics)
1096-
1097-
self._logger.info("Flushing %d traces...", len(self.traces))
1098-
1099-
traces_ingest_request = TracesIngestRequest(traces=self.traces, session_id=self.session_id)
1100-
await self._client.ingest_traces(traces_ingest_request)
1101-
logged_traces = self.traces
1102-
1103-
self._logger.info("Successfully flushed %d traces.", len(logged_traces))
1071+
@nop_sync
1072+
def flush(self) -> list[Trace]:
1073+
"""
1074+
Upload all traces to Galileo.
11041075
1105-
self.traces = list()
1106-
self._parent_stack = deque()
1107-
return logged_traces
1076+
Returns:
1077+
-------
1078+
List[Trace]: The list of uploaded traces.
1079+
"""
1080+
if self.mode == "batch":
1081+
# This is bad because asyncio.run() fails in environments with existing event loops
1082+
# (e.g. jupyter notebooks, FastAPI, etc. would fail with "cannot be called from a running event loop"
1083+
# Even though flush() is sync, it can be called from async contexts like:
1084+
# - Jupyter notebooks (which have their own event loop)
1085+
# - pytest-asyncio tests (where @mark.asyncio creates an event loop)
1086+
# - FastAPI/Django async views (where the web framework has an event loop)
1087+
# - Any async function that calls sync code
1088+
# The EventLoopThreadPool approach works in ALL environments by using dedicated threads
1089+
# return asyncio.run(self._flush_batch(is_async=False))
1090+
1091+
# This is good because async_run() uses EventLoopThreadPool which works in all environments
1092+
# by running async code in dedicated threads with their own event loops
1093+
from galileo_core.helpers.execution import async_run
1094+
1095+
return async_run(self._flush_batch(is_async=False))
1096+
else:
1097+
self._logger.warning("Flushing in streaming mode is not supported.")
1098+
return list()
11081099

11091100
@nop_sync
11101101
def terminate(self) -> None:

tests/test_openai_agents.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,47 @@ async def test_simple_agent(
127127
assert span.metrics.duration_ns
128128
assert span.metrics.duration_ns > 0
129129

130+
await galileo_logger.async_flush()
131+
payload = mock_core_api_instance.ingest_traces.call_args[0][0]
132+
assert len(payload.traces) == 1
133+
assert len(payload.traces[0].spans) == 1
134+
135+
136+
@vcr.use_cassette(
137+
"tests/fixtures/openai_agents.yaml",
138+
filter_headers=["authorization"],
139+
decode_compressed_response=True,
140+
record_mode=vcr.mode.NEW_EPISODES,
141+
)
142+
@patch("galileo.logger.logger.LogStreams")
143+
@patch("galileo.logger.logger.Projects")
144+
@patch("galileo.logger.logger.GalileoCoreApiClient")
145+
def test_simple_agent_sync_flush(
146+
mock_core_api_client: Mock, mock_projects_client: Mock, mock_logstreams_client: Mock, monkeypatch: MonkeyPatch
147+
) -> None:
148+
"""Test sync flush() method - this test is NOT async"""
149+
mock_core_api_instance = setup_mock_core_api_client(mock_core_api_client)
150+
setup_mock_projects_client(mock_projects_client)
151+
setup_mock_logstreams_client(mock_logstreams_client)
152+
153+
galileo_logger = GalileoLogger(project="test", log_stream="test")
154+
155+
# Add a simple trace manually (no async Runner.run)
156+
galileo_logger.start_trace(input="Test input")
157+
galileo_logger.add_llm_span(
158+
input="Test input",
159+
output="Test output",
160+
model="gpt-4o",
161+
num_input_tokens=5,
162+
num_output_tokens=5,
163+
total_tokens=10,
164+
)
165+
galileo_logger.conclude(output="Test output")
166+
167+
# This should work since we're NOT in an async context
130168
galileo_logger.flush()
169+
170+
# Check that sync method was called
131171
payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0]
132172
assert len(payload.traces) == 1
133173
assert len(payload.traces[0].spans) == 1

0 commit comments

Comments
 (0)