diff --git a/src/galileo/decorator.py b/src/galileo/decorator.py index 6cf4967f0..c6b1fe008 100644 --- a/src/galileo/decorator.py +++ b/src/galileo/decorator.py @@ -930,6 +930,24 @@ def start_session( name=name, previous_session_id=previous_session_id, external_id=external_id ) + async def async_start_session( + self, name: Optional[str] = None, previous_session_id: Optional[str] = None, external_id: Optional[str] = None + ) -> str: + """ + Async start a session in the active context logger instance. + + Args: + name: The name of the session. If not provided, a session name will be generated automatically. + previous_session_id: The id of the previous session. Defaults to None. + external_id: The external id of the session. Defaults to None. + + Returns: + str: The id of the newly created session. + """ + return await self.get_logger_instance().async_start_session( + name=name, previous_session_id=previous_session_id, external_id=external_id + ) + def clear_session(self) -> None: """Clear the session in the active context logger instance.""" self.get_logger_instance().clear_session() diff --git a/src/galileo/logger/logger.py b/src/galileo/logger/logger.py index fba4e0475..5175114d7 100644 --- a/src/galileo/logger/logger.py +++ b/src/galileo/logger/logger.py @@ -1016,22 +1016,8 @@ def conclude( return current_parent - @nop_sync - def flush(self) -> list[Trace]: - """ - Upload all traces to Galileo. - - Returns: - ------- - List[Trace]: The list of uploaded traces. - """ - if self.mode == "batch": - return self._flush_batch() - else: - self._logger.warning("Flushing in streaming mode is not supported.") - return list() - - def _flush_batch(self): + async def _flush_batch(self, is_async: bool = False) -> list[Trace]: + # import pdb; pdb.set_trace() if not self.traces: self._logger.info("No traces to flush.") return list() @@ -1044,7 +1030,7 @@ def _flush_batch(self): if self.local_metrics: self._logger.info("Computing local metrics...") - # TODO: parallelize, possibly with ThreadPoolExecutor + # TODO: parallelize, possibly with ThreadPoolExecutor/asyncio for trace in self.traces: populate_local_metrics(trace, self.local_metrics) @@ -1053,7 +1039,16 @@ def _flush_batch(self): traces_ingest_request = TracesIngestRequest( traces=self.traces, experiment_id=self.experiment_id, session_id=self.session_id ) - self._client.ingest_traces_sync(traces_ingest_request) + + if is_async: + await self._client.ingest_traces(traces_ingest_request) + else: + # Use async_run() instead of asyncio.run() to work in all environments + # (Jupyter notebooks, pytest-asyncio, FastAPI, etc.) + from galileo_core.helpers.execution import async_run + + async_run(self._client.ingest_traces(traces_ingest_request)) + logged_traces = self.traces self._logger.info("Successfully flushed %d traces.", len(logged_traces)) @@ -1072,39 +1067,39 @@ async def async_flush(self) -> list[Trace]: List[Trace]: The list of uploaded workflows. """ if self.mode == "batch": - return await self._async_flush_batch() + return await self._flush_batch(is_async=True) else: self._logger.warning("Flushing in streaming mode is not supported.") return list() - async def _async_flush_batch(self) -> list[Trace]: - if not self.traces: - self._logger.info("No traces to flush.") - return list() - - current_parent = self.current_parent() - if current_parent is not None: - self._logger.info("Concluding the active trace...") - last_output = get_last_output(current_parent) - self.conclude(output=last_output, conclude_all=True) - - if self.local_metrics: - self._logger.info("Computing metrics for local scorers...") - # TODO: parallelize, possibly with asyncio to_thread/gather - for trace in self.traces: - populate_local_metrics(trace, self.local_metrics) - - self._logger.info("Flushing %d traces...", len(self.traces)) - - traces_ingest_request = TracesIngestRequest(traces=self.traces, session_id=self.session_id) - await self._client.ingest_traces(traces_ingest_request) - logged_traces = self.traces - - self._logger.info("Successfully flushed %d traces.", len(logged_traces)) + @nop_sync + def flush(self) -> list[Trace]: + """ + Upload all traces to Galileo. - self.traces = list() - self._parent_stack = deque() - return logged_traces + Returns: + ------- + List[Trace]: The list of uploaded traces. + """ + if self.mode == "batch": + # This is bad because asyncio.run() fails in environments with existing event loops + # (e.g. jupyter notebooks, FastAPI, etc. would fail with "cannot be called from a running event loop" + # Even though flush() is sync, it can be called from async contexts like: + # - Jupyter notebooks (which have their own event loop) + # - pytest-asyncio tests (where @mark.asyncio creates an event loop) + # - FastAPI/Django async views (where the web framework has an event loop) + # - Any async function that calls sync code + # The EventLoopThreadPool approach works in ALL environments by using dedicated threads + # return asyncio.run(self._flush_batch(is_async=False)) + + # This is good because async_run() uses EventLoopThreadPool which works in all environments + # by running async code in dedicated threads with their own event loops + from galileo_core.helpers.execution import async_run + + return async_run(self._flush_batch(is_async=False)) + else: + self._logger.warning("Flushing in streaming mode is not supported.") + return list() @nop_sync def terminate(self) -> None: diff --git a/src/galileo/utils/core_api_client.py b/src/galileo/utils/core_api_client.py index 7e049b523..95ab3b3d7 100644 --- a/src/galileo/utils/core_api_client.py +++ b/src/galileo/utils/core_api_client.py @@ -85,18 +85,6 @@ async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dic RequestMethod.POST, endpoint=Routes.traces.format(project_id=self.project_id), json=json ) - def ingest_traces_sync(self, traces_ingest_request: TracesIngestRequest) -> dict[str, str]: - if self.experiment_id: - traces_ingest_request.experiment_id = UUID(self.experiment_id) - elif self.log_stream_id: - traces_ingest_request.log_stream_id = UUID(self.log_stream_id) - - json = traces_ingest_request.model_dump(mode="json") - - return self._make_request( - RequestMethod.POST, endpoint=Routes.traces.format(project_id=self.project_id), json=json - ) - async def ingest_spans(self, spans_ingest_request: SpansIngestRequest) -> dict[str, str]: if self.experiment_id: spans_ingest_request.experiment_id = UUID(self.experiment_id) diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 8b1bec82f..73fe21518 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -90,7 +90,7 @@ def llm_call(query: str): galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -130,7 +130,7 @@ def llm_call(query: str): galileo_context.flush(project="project-X", log_stream="log-stream-X") - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -139,7 +139,7 @@ def llm_call(query: str): galileo_context.flush(project="project-Y", log_stream="log-stream-Y") - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -211,7 +211,7 @@ def llm_call(query: str): llm_call(query="input") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -238,7 +238,7 @@ def my_function(arg1, arg2): my_function(1, 2) galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -267,7 +267,7 @@ def my_function(system: Message, user: Message): ) galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -302,7 +302,7 @@ def my_function(system: Message, user: Message): ) galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -335,7 +335,7 @@ def my_function(arg1: str, arg2: str): my_function("arg1", "arg2") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -362,7 +362,7 @@ def my_function(arg1: str, arg2: str): my_function("arg1", "arg2") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -394,7 +394,7 @@ def my_function(arg1: str, arg2: str): my_function("arg1", "arg2") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -431,7 +431,7 @@ def nested_call(nested_query: str): output = nested_call(nested_query="input") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -468,7 +468,7 @@ def nested_call(nested_query: str): output = nested_call(nested_query="input") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -500,7 +500,7 @@ def retriever_call(query: str): retriever_call(query="input") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == '{"query": "input"}' @@ -524,7 +524,7 @@ def retriever_call(query: str): retriever_call(query="input") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == '{"query": "input"}' @@ -551,7 +551,7 @@ def retriever_call(query: str): retriever_call(query="input") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == '{"query": "input"}' @@ -578,7 +578,7 @@ def retriever_call(query: str): retriever_call(query="input") galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == '{"query": "input"}' @@ -607,7 +607,7 @@ def foo(): galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -637,7 +637,7 @@ def foo(): galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert payload.session_id == UUID("6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9c") @@ -664,7 +664,7 @@ def foo(): galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert payload.session_id == UUID("6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9c") @@ -695,7 +695,7 @@ def foo(): galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert payload.session_id is None @@ -722,7 +722,7 @@ def foo(): galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert payload.session_id == UUID("6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9c") @@ -751,7 +751,7 @@ def foo(input: str): logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert payload.traces[0].input == "test input" assert payload.traces[0].output == "test output" diff --git a/tests/test_experiments.py b/tests/test_experiments.py index f6ae569b7..a7f7e3833 100644 --- a/tests/test_experiments.py +++ b/tests/test_experiments.py @@ -412,7 +412,7 @@ def test_run_experiment_with_func( mock_get_dataset_instance.get_content.assert_called() # check galileo_logger - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 trace = payload.traces[0] @@ -594,7 +594,7 @@ def runner(input): mock_get_dataset_instance.get_content.assert_called() # check galileo_logger - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert ( payload.traces[0].input == '{"input": {"question": "Which continent is Spain in?", "expected": "Europe"}}' diff --git a/tests/test_logger_batch.py b/tests/test_logger_batch.py index 400c5a239..3fdb6803c 100644 --- a/tests/test_logger_batch.py +++ b/tests/test_logger_batch.py @@ -57,7 +57,7 @@ def test_disable_galileo_logger(mock_core_api_client: Mock, monkeypatch, caplog) assert "Bypassing logging for conclude. Logging is currently disabled." in caplog.text assert "Bypassing logging for flush. Logging is currently disabled." in caplog.text mock_core_api_client.assert_not_called() - mock_core_api_client.ingest_traces_sync.assert_not_called() + mock_core_api_client.ingest_traces.assert_not_called() @patch("galileo.logger.logger.LogStreams") @@ -91,7 +91,7 @@ def test_single_span_trace_to_galileo( logger.conclude("output", status_code=200) logger.flush() - payload: TracesIngestRequest = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload: TracesIngestRequest = mock_core_api_instance.ingest_traces.call_args[0][0] expected_payload = TracesIngestRequest( log_stream_id=None, # TODO: fix this experiment_id=None, @@ -203,7 +203,7 @@ def test_all_span_types_with_redacted_fields( logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] trace = payload.traces[0] assert trace.input == "Sensitive trace input: api_key_123" @@ -268,7 +268,7 @@ def test_single_span_trace_to_galileo_experiment_id( logger.conclude("output", status_code=200) logger.flush() - payload: TracesIngestRequest = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload: TracesIngestRequest = mock_core_api_instance.ingest_traces.call_args[0][0] expected_payload = TracesIngestRequest( log_stream_id=None, experiment_id="6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9a", @@ -337,7 +337,7 @@ def test_nested_span_trace_to_galileo( logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] expected_payload = TracesIngestRequest( log_stream_id=None, # TODO: fix this experiment_id=None, @@ -368,7 +368,7 @@ def test_add_agent_span(mock_core_api_client: Mock, mock_projects_client: Mock, logger.conclude(output="response", duration_ns=1_000_000, status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] expected_payload = TracesIngestRequest(log_stream_id=None, experiment_id=None, traces=[trace]) assert payload == expected_payload assert isinstance(payload.traces[0].spans[0], AgentSpan) @@ -429,7 +429,7 @@ def test_multi_span_trace_to_galileo( logger.flush() - payload: TracesIngestRequest = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload: TracesIngestRequest = mock_core_api_instance.ingest_traces.call_args[0][0] expected_payload = TracesIngestRequest( log_stream_id=None, # TODO: fix this experiment_id=None, @@ -551,7 +551,7 @@ def test_retriever_span_str_output( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == "prompt" @@ -577,7 +577,7 @@ def test_retriever_span_list_str_output( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == "prompt" @@ -613,7 +613,7 @@ def test_retriever_span_dict_output( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == "prompt" @@ -655,7 +655,7 @@ def test_retriever_span_list_dict_output( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == "prompt" @@ -692,7 +692,7 @@ def test_retriever_span_document_output( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == "prompt" @@ -722,7 +722,7 @@ def test_retriever_span_list_document_output( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == "prompt" @@ -749,7 +749,7 @@ def test_retriever_span_none_output( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert isinstance(payload.traces[0].spans[0], RetrieverSpan) assert payload.traces[0].spans[0].input == "prompt" @@ -830,7 +830,7 @@ def test_flush_with_conclude_all_spans( logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -1090,7 +1090,7 @@ def test_session_id_on_flush( logger.conclude("output", status_code=200) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert str(payload.session_id) == session_id == "6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9c" @@ -1116,7 +1116,7 @@ def test_set_session_id(mock_core_api_client: Mock, mock_projects_client: Mock, logger.flush() # Check that the session ID is set correctly in the payload - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert payload.session_id == UUID(session_id) @@ -1188,7 +1188,7 @@ def test_start_session_with_external_id( logger.flush() # Check that the session ID is set correctly in the payload - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert payload.session_id == session_id diff --git a/tests/test_logger_streaming.py b/tests/test_logger_streaming.py index f904952f1..e0fa77409 100644 --- a/tests/test_logger_streaming.py +++ b/tests/test_logger_streaming.py @@ -53,7 +53,7 @@ def test_disable_galileo_logger(mock_core_api_client: Mock, monkeypatch, caplog) assert len(captured_tasks) == 0 mock_core_api_client.assert_not_called() - mock_core_api_client.ingest_traces_sync.assert_not_called() + mock_core_api_client.ingest_traces.assert_not_called() mock_core_api_client.ingest_spans_sync.assert_not_called() mock_core_api_client.update_trace_sync.assert_not_called() mock_core_api_client.update_span_sync.assert_not_called() @@ -81,7 +81,9 @@ def test_start_trace(mock_core_api_client: Mock, mock_projects_client: Mock, moc request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].input == "input" @@ -128,7 +130,9 @@ def test_add_llm_span(mock_core_api_client: Mock, mock_projects_client: Mock, mo request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].input == "input" @@ -143,7 +147,9 @@ def test_add_llm_span(mock_core_api_client: Mock, mock_projects_client: Mock, mo request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -185,7 +191,9 @@ def test_conclude_trace(mock_core_api_client: Mock, mock_projects_client: Mock, request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].type == "trace" @@ -203,7 +211,9 @@ def test_conclude_trace(mock_core_api_client: Mock, mock_projects_client: Mock, request = captured_task.request assert isinstance(request, TraceUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_trace.assert_called_with(request) assert request.trace_id == trace_id @@ -253,7 +263,9 @@ def test_conclude_trace_with_span( request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].type == "trace" @@ -271,7 +283,9 @@ def test_conclude_trace_with_span( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -290,7 +304,9 @@ def test_conclude_trace_with_span( request = captured_task.request assert isinstance(request, TraceUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_trace.assert_called_with(request) assert request.trace_id == trace_id @@ -352,7 +368,9 @@ def test_conclude_trace_and_start_new_trace( request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].type == "trace" @@ -371,7 +389,9 @@ def test_conclude_trace_and_start_new_trace( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -391,7 +411,9 @@ def test_conclude_trace_and_start_new_trace( request = captured_task.request assert isinstance(request, TraceUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_trace.assert_called_with(request) assert request.trace_id == trace_id @@ -404,7 +426,9 @@ def test_conclude_trace_and_start_new_trace( request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].type == "trace" @@ -468,7 +492,9 @@ def test_conclude_trace_with_nested_span( request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].type == "trace" @@ -487,7 +513,9 @@ def test_conclude_trace_with_nested_span( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -508,7 +536,9 @@ def test_conclude_trace_with_nested_span( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -529,7 +559,9 @@ def test_conclude_trace_with_nested_span( request = captured_task.request assert isinstance(request, SpanUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_span.assert_called_with(request) assert request.span_id == workflow_span_id @@ -541,7 +573,9 @@ def test_conclude_trace_with_nested_span( request = captured_task.request assert isinstance(request, TraceUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_trace.assert_called_with(request) assert request.trace_id == trace_id @@ -595,7 +629,9 @@ def test_conclude_all_with_nested_span( request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].type == "trace" @@ -613,7 +649,9 @@ def test_conclude_all_with_nested_span( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -632,7 +670,9 @@ def test_conclude_all_with_nested_span( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -652,7 +692,9 @@ def test_conclude_all_with_nested_span( request = captured_task.request assert isinstance(request, SpanUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_span.assert_called_with(request) assert request.span_id == workflow_span_id @@ -664,7 +706,9 @@ def test_conclude_all_with_nested_span( request = captured_task.request assert isinstance(request, TraceUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_trace.assert_called_with(request) assert request.trace_id == trace_id @@ -728,7 +772,9 @@ def test_conclude_trace_with_agent_span( request = captured_task.request assert isinstance(request, TracesIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_traces.assert_called_with(request) assert request.traces[0].type == "trace" @@ -746,7 +792,9 @@ def test_conclude_trace_with_agent_span( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -767,7 +815,9 @@ def test_conclude_trace_with_agent_span( request = captured_task.request assert isinstance(request, SpansIngestRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.ingest_spans.assert_called_with(request) assert request.trace_id == trace_id @@ -788,7 +838,9 @@ def test_conclude_trace_with_agent_span( request = captured_task.request assert isinstance(request, SpanUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_span.assert_called_with(request) assert request.span_id == agent_span_id @@ -800,7 +852,9 @@ def test_conclude_trace_with_agent_span( request = captured_task.request assert isinstance(request, TraceUpdateRequest) - asyncio.run(captured_task.task_func()) + from galileo_core.helpers.execution import async_run + + async_run(captured_task.task_func()) mock_core_api_client_instance.update_trace.assert_called_with(request) assert request.trace_id == trace_id diff --git a/tests/test_openai.py b/tests/test_openai.py index 67d777cdb..bc218a295 100644 --- a/tests/test_openai.py +++ b/tests/test_openai.py @@ -55,7 +55,7 @@ def test_basic_openai_call( assert response == "The mock is working! ;)" galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert payload.traces[0].status_code == 200 @@ -108,7 +108,7 @@ def test_streamed_openai_call( assert chunk_count == 3 galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert payload.traces[0].status_code == 200 @@ -154,7 +154,7 @@ def call_openai(model: str = "gpt-3.5-turbo"): galileo_context.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -202,8 +202,8 @@ def call_openai(model: str = "gpt-3.5-turbo"): galileo_context.flush() openai_create.assert_called_once() - mock_core_api_instance.ingest_traces_sync.assert_called() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + mock_core_api_instance.ingest_traces.assert_called() + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 @@ -239,8 +239,8 @@ def call_openai(model: str = "gpt-3.5-turbo"): galileo_context.flush() openai_create.assert_called_once() - mock_core_api_instance.ingest_traces_sync.assert_called() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + mock_core_api_instance.ingest_traces.assert_called() + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert payload.traces[0].status_code == 401 assert payload.traces[0].output == "" @@ -278,8 +278,8 @@ def call_openai(model: str = "gpt-3.5-turbo"): mock_projects_client.assert_called_once() openai_create.assert_called_once() - mock_core_api_instance.ingest_traces_sync.assert_called() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + mock_core_api_instance.ingest_traces.assert_called() + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 @@ -299,7 +299,6 @@ def test_galileo_api_client_transport_error_not_blocking_user_code( m = mock_core_api_client.return_value m.get_project_by_name = AsyncMock(side_effect=httpx.HTTPError("http error")) m.get_log_stream_by_name = AsyncMock(side_effect=httpx.HTTPError("http error")) - m.ingest_traces_sync = AsyncMock(side_effect=httpx.HTTPError("http error")) m.ingest_traces = AsyncMock(side_effect=httpx.HTTPError("http error")) setup_mock_projects_client(mock_projects_client) @@ -353,7 +352,7 @@ def test_openai_calls_in_active_trace( logger.conclude(output="trace completed", duration_ns=1000) logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 2 diff --git a/tests/test_openai_agents.py b/tests/test_openai_agents.py index c0300b69b..7bde8f73a 100644 --- a/tests/test_openai_agents.py +++ b/tests/test_openai_agents.py @@ -127,7 +127,47 @@ async def test_simple_agent( assert span.metrics.duration_ns assert span.metrics.duration_ns > 0 + await galileo_logger.async_flush() + payload = mock_core_api_instance.ingest_traces.call_args[0][0] + assert len(payload.traces) == 1 + assert len(payload.traces[0].spans) == 1 + + +@vcr.use_cassette( + "tests/fixtures/openai_agents.yaml", + filter_headers=["authorization"], + decode_compressed_response=True, + record_mode=vcr.mode.NEW_EPISODES, +) +@patch("galileo.logger.logger.LogStreams") +@patch("galileo.logger.logger.Projects") +@patch("galileo.logger.logger.GalileoCoreApiClient") +def test_simple_agent_sync_flush( + mock_core_api_client: Mock, mock_projects_client: Mock, mock_logstreams_client: Mock, monkeypatch: MonkeyPatch +) -> None: + """Test sync flush() method - this test is NOT async""" + mock_core_api_instance = setup_mock_core_api_client(mock_core_api_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + + galileo_logger = GalileoLogger(project="test", log_stream="test") + + # Add a simple trace manually (no async Runner.run) + galileo_logger.start_trace(input="Test input") + galileo_logger.add_llm_span( + input="Test input", + output="Test output", + model="gpt-4o", + num_input_tokens=5, + num_output_tokens=5, + total_tokens=10, + ) + galileo_logger.conclude(output="Test output") + + # This should work since we're NOT in an async context galileo_logger.flush() - payload = mock_core_api_instance.ingest_traces_sync.call_args[0][0] + + # Check that sync method was called + payload = mock_core_api_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert len(payload.traces[0].spans) == 1 diff --git a/tests/testutils/setup.py b/tests/testutils/setup.py index 693c75c6c..6523c9034 100644 --- a/tests/testutils/setup.py +++ b/tests/testutils/setup.py @@ -260,7 +260,6 @@ def setup_mock_core_api_client(mock_core_api_client: Mock): mock_instance = mock_core_api_client.return_value mock_instance.get_project_by_name = Mock(return_value={"id": UUID("6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9a")}) mock_instance.get_log_stream_by_name = Mock(return_value={"id": UUID("6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9b")}) - mock_instance.ingest_traces_sync = Mock(return_value={}) mock_instance.ingest_traces = AsyncMock(return_value={}) mock_instance.ingest_spans = AsyncMock(return_value={}) mock_instance.ingest_spans_sync = Mock(return_value={})