Skip to content

Commit 6f29896

Browse files
committed
Make code async
1 parent 8d82fd0 commit 6f29896

File tree

4 files changed

+94
-58
lines changed

4 files changed

+94
-58
lines changed

src/galileo/decorator.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,24 @@ def start_session(
930930
name=name, previous_session_id=previous_session_id, external_id=external_id
931931
)
932932

933+
async def async_start_session(
934+
self, name: Optional[str] = None, previous_session_id: Optional[str] = None, external_id: Optional[str] = None
935+
) -> str:
936+
"""
937+
Async start a session in the active context logger instance.
938+
939+
Args:
940+
name: The name of the session. If not provided, a session name will be generated automatically.
941+
previous_session_id: The id of the previous session. Defaults to None.
942+
external_id: The external id of the session. Defaults to None.
943+
944+
Returns:
945+
str: The id of the newly created session.
946+
"""
947+
return await self.get_logger_instance().async_start_session(
948+
name=name, previous_session_id=previous_session_id, external_id=external_id
949+
)
950+
933951
def clear_session(self) -> None:
934952
"""Clear the session in the active context logger instance."""
935953
self.get_logger_instance().clear_session()

src/galileo/logger/logger.py

Lines changed: 36 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from galileo.utils.metrics import populate_local_metrics
3636
from galileo.utils.nop_logger import nop_async, nop_sync
3737
from galileo.utils.serialization import serialize_to_str
38+
from galileo_core.helpers.execution import async_run
3839
from galileo_core.schemas.logging.agent import AgentType
3940
from galileo_core.schemas.logging.span import (
4041
AgentSpan,
@@ -1016,22 +1017,8 @@ def conclude(
10161017

10171018
return current_parent
10181019

1019-
@nop_sync
1020-
def flush(self) -> list[Trace]:
1021-
"""
1022-
Upload all traces to Galileo.
1023-
1024-
Returns:
1025-
-------
1026-
List[Trace]: The list of uploaded traces.
1027-
"""
1028-
if self.mode == "batch":
1029-
return self._flush_batch()
1030-
else:
1031-
self._logger.warning("Flushing in streaming mode is not supported.")
1032-
return list()
1033-
1034-
def _flush_batch(self):
1020+
async def _flush_batch(self, is_async: bool = False) -> list[Trace]:
1021+
# import pdb; pdb.set_trace()
10351022
if not self.traces:
10361023
self._logger.info("No traces to flush.")
10371024
return list()
@@ -1044,7 +1031,7 @@ def _flush_batch(self):
10441031

10451032
if self.local_metrics:
10461033
self._logger.info("Computing local metrics...")
1047-
# TODO: parallelize, possibly with ThreadPoolExecutor
1034+
# TODO: parallelize, possibly with ThreadPoolExecutor/asyncio
10481035
for trace in self.traces:
10491036
populate_local_metrics(trace, self.local_metrics)
10501037

@@ -1053,7 +1040,11 @@ def _flush_batch(self):
10531040
traces_ingest_request = TracesIngestRequest(
10541041
traces=self.traces, experiment_id=self.experiment_id, session_id=self.session_id
10551042
)
1056-
self._client.ingest_traces_sync(traces_ingest_request)
1043+
1044+
if is_async:
1045+
await self._client.ingest_traces(traces_ingest_request)
1046+
async_run(self._client.ingest_traces(traces_ingest_request))
1047+
10571048
logged_traces = self.traces
10581049

10591050
self._logger.info("Successfully flushed %d traces.", len(logged_traces))
@@ -1072,39 +1063,38 @@ async def async_flush(self) -> list[Trace]:
10721063
List[Trace]: The list of uploaded workflows.
10731064
"""
10741065
if self.mode == "batch":
1075-
return await self._async_flush_batch()
1066+
return await self._flush_batch(is_async=True)
10761067
else:
10771068
self._logger.warning("Flushing in streaming mode is not supported.")
10781069
return list()
10791070

1080-
async def _async_flush_batch(self) -> list[Trace]:
1081-
if not self.traces:
1082-
self._logger.info("No traces to flush.")
1083-
return list()
1084-
1085-
current_parent = self.current_parent()
1086-
if current_parent is not None:
1087-
self._logger.info("Concluding the active trace...")
1088-
last_output = get_last_output(current_parent)
1089-
self.conclude(output=last_output, conclude_all=True)
1090-
1091-
if self.local_metrics:
1092-
self._logger.info("Computing metrics for local scorers...")
1093-
# TODO: parallelize, possibly with asyncio to_thread/gather
1094-
for trace in self.traces:
1095-
populate_local_metrics(trace, self.local_metrics)
1096-
1097-
self._logger.info("Flushing %d traces...", len(self.traces))
1098-
1099-
traces_ingest_request = TracesIngestRequest(traces=self.traces, session_id=self.session_id)
1100-
await self._client.ingest_traces(traces_ingest_request)
1101-
logged_traces = self.traces
1102-
1103-
self._logger.info("Successfully flushed %d traces.", len(logged_traces))
1071+
@nop_sync
1072+
def flush(self) -> list[Trace]:
1073+
"""
1074+
Upload all traces to Galileo.
11041075
1105-
self.traces = list()
1106-
self._parent_stack = deque()
1107-
return logged_traces
1076+
Returns:
1077+
-------
1078+
List[Trace]: The list of uploaded traces.
1079+
"""
1080+
if self.mode == "batch":
1081+
# This is bad because asyncio.run() fails in environments with existing event loops
1082+
# (e.g. jupyter notebooks, FastAPI, etc. would fail with "cannot be called from a running event loop"
1083+
# Even though flush() is sync, it can be called from async contexts like:
1084+
# - Jupyter notebooks (which have their own event loop)
1085+
# - pytest-asyncio tests (where @mark.asyncio creates an event loop)
1086+
# - FastAPI/Django async views (where the web framework has an event loop)
1087+
# - Any async function that calls sync code
1088+
# The EventLoopThreadPool approach works in ALL environments by using dedicated threads
1089+
# return asyncio.run(self._flush_batch(is_async=False))
1090+
1091+
# This is good because async_run() uses EventLoopThreadPool which works in all environments
1092+
# by running async code in dedicated threads with their own event loops
1093+
1094+
return async_run(self._flush_batch(is_async=False))
1095+
else:
1096+
self._logger.warning("Flushing in streaming mode is not supported.")
1097+
return list()
11081098

11091099
@nop_sync
11101100
def terminate(self) -> None:

src/galileo/utils/core_api_client.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,6 @@ async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dic
8585
RequestMethod.POST, endpoint=Routes.traces.format(project_id=self.project_id), json=json
8686
)
8787

88-
def ingest_traces_sync(self, traces_ingest_request: TracesIngestRequest) -> dict[str, str]:
89-
if self.experiment_id:
90-
traces_ingest_request.experiment_id = UUID(self.experiment_id)
91-
elif self.log_stream_id:
92-
traces_ingest_request.log_stream_id = UUID(self.log_stream_id)
93-
94-
json = traces_ingest_request.model_dump(mode="json")
95-
96-
return self._make_request(
97-
RequestMethod.POST, endpoint=Routes.traces.format(project_id=self.project_id), json=json
98-
)
99-
10088
async def ingest_spans(self, spans_ingest_request: SpansIngestRequest) -> dict[str, str]:
10189
if self.experiment_id:
10290
spans_ingest_request.experiment_id = UUID(self.experiment_id)

tests/test_openai_agents.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,47 @@ async def test_simple_agent(
127127
assert span.metrics.duration_ns
128128
assert span.metrics.duration_ns > 0
129129

130+
await galileo_logger.async_flush()
131+
payload = mock_core_api_instance.ingest_traces.call_args[0][0]
132+
assert len(payload.traces) == 1
133+
assert len(payload.traces[0].spans) == 1
134+
135+
136+
@vcr.use_cassette(
137+
"tests/fixtures/openai_agents.yaml",
138+
filter_headers=["authorization"],
139+
decode_compressed_response=True,
140+
record_mode=vcr.mode.NEW_EPISODES,
141+
)
142+
@patch("galileo.logger.logger.LogStreams")
143+
@patch("galileo.logger.logger.Projects")
144+
@patch("galileo.logger.logger.GalileoCoreApiClient")
145+
def test_simple_agent_sync_flush(
146+
mock_core_api_client: Mock, mock_projects_client: Mock, mock_logstreams_client: Mock, monkeypatch: MonkeyPatch
147+
) -> None:
148+
"""Test sync flush() method - this test is NOT async"""
149+
mock_core_api_instance = setup_mock_core_api_client(mock_core_api_client)
150+
setup_mock_projects_client(mock_projects_client)
151+
setup_mock_logstreams_client(mock_logstreams_client)
152+
153+
galileo_logger = GalileoLogger(project="test", log_stream="test")
154+
155+
# Add a simple trace manually (no async Runner.run)
156+
galileo_logger.start_trace(input="Test input")
157+
galileo_logger.add_llm_span(
158+
input="Test input",
159+
output="Test output",
160+
model="gpt-4o",
161+
num_input_tokens=5,
162+
num_output_tokens=5,
163+
total_tokens=10,
164+
)
165+
galileo_logger.conclude(output="Test output")
166+
167+
# This should work since we're NOT in an async context
130168
galileo_logger.flush()
169+
170+
# Check that sync method was called
131171
payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0]
132172
assert len(payload.traces) == 1
133173
assert len(payload.traces[0].spans) == 1

0 commit comments

Comments
 (0)