Skip to content

Commit 8152c6e

Browse files
authored
Merge pull request #2068 from BloggerBust/fix/async-gather-reuse-hang
fix(async,ci): prevent coroutine re-await & stabilize async evals
2 parents 3041d6e + 96e1e10 commit 8152c6e

9 files changed

Lines changed: 357 additions & 77 deletions

File tree

.github/workflows/test_core.yml

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ on:
88
jobs:
99
test:
1010
runs-on: ubuntu-latest
11+
env:
12+
# Expose once at job level because forked PRs can't use secrets.* in `if:` conditions.
13+
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
14+
1115
steps:
1216
#----------------------------------------------
1317
# check-out repo and set-up python
@@ -54,20 +58,46 @@ jobs:
5458
#----------------------------------------------
5559
# run test suite
5660
#----------------------------------------------
61+
62+
# Run tests (with secrets): full suite
5763
- name: Run tests
58-
env:
59-
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
64+
if: ${{ env.OPENAI_API_KEY != '' }}
65+
run: |
66+
poetry run pytest -vv -rA --maxfail=1 --capture=tee-sys \
67+
tests/test_core/ \
68+
--ignore=tests/test_core/test_synthesizer/ \
69+
--ignore=tests/test_core/test_datasets/
70+
71+
# Run tests (no secrets): skip e2e that require API keys
72+
- name: Run tests (no secrets)
73+
if: ${{ env.OPENAI_API_KEY == '' }}
6074
run: |
61-
poetry run pytest -vv -rA --maxfail=1 --capture=tee-sys tests/test_core/ --ignore=tests/test_core/test_synthesizer/ --ignore=tests/test_core/test_datasets/ --ignore=tests/test_core/test_tracing/test_dataset_iterator.py
75+
poetry run pytest -vv -rA --maxfail=1 --capture=tee-sys tests/test_core/ \
76+
--ignore=tests/test_core/test_synthesizer/ \
77+
--ignore=tests/test_core/test_datasets/ \
78+
--ignore=tests/test_core/test_tracing/test_dataset_iterator.py \
79+
--ignore=tests/test_core/test_evaluation/test_end_to_end/test_configs.py
6280
6381
#----------------------------------------------
6482
# install dev dependencies (including chromadb) and run synthesizer tests
6583
#----------------------------------------------
6684
- name: Install dev dependencies
6785
run: poetry install --no-interaction --with dev
6886

69-
- name: Run core tests with dev dependencies
70-
env:
71-
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
87+
# Dev tests (with secrets)
88+
- name: Run dev tests
89+
if: ${{ env.OPENAI_API_KEY != '' }}
90+
run: |
91+
poetry run pytest -vv -rA --maxfail=1 --capture=tee-sys -o faulthandler_timeout=300 \
92+
tests/test_core/test_synthesizer/ tests/test_core/test_datasets/
93+
94+
# Dev tests (no secrets)
95+
- name: Run dev tests (no secrets)
96+
if: ${{ env.OPENAI_API_KEY == '' }}
7297
run: |
73-
poetry run pytest -vv -rA --maxfail=1 --capture=tee-sys tests/test_core/test_synthesizer/ tests/test_core/test_datasets/ --ignore=tests/test_core/test_tracing/test_dataset_iterator.py
98+
poetry run pytest -vv -rA --maxfail=1 --capture=tee-sys tests/test_core/test_synthesizer/ tests/test_core/test_datasets/ \
99+
--ignore=tests/test_core/test_tracing/test_dataset_iterator.py \
100+
--ignore=tests/test_core/test_synthesizer/test_context_generator.py \
101+
--ignore=tests/test_core/test_synthesizer/test_conversation_simulator.py \
102+
--ignore=tests/test_core/test_synthesizer/test_generate_from_goldens.py \
103+
--ignore=tests/test_core/test_synthesizer/test_synthesizer.py

deepeval/config/settings.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ class Settings(BaseSettings):
281281
#
282282
# Telemetry and Debug
283283
#
284+
DEEPEVAL_DEBUG_ASYNC: Optional[bool] = None
284285
DEEPEVAL_TELEMETRY_OPT_OUT: Optional[bool] = None
285286
DEEPEVAL_UPDATE_WARNING_OPT_IN: Optional[bool] = None
286287
DEEPEVAL_GRPC_LOGGING: Optional[bool] = None
@@ -303,6 +304,19 @@ class Settings(BaseSettings):
303304
MEDIA_IMAGE_CONNECT_TIMEOUT_SECONDS: float = 3.05
304305
MEDIA_IMAGE_READ_TIMEOUT_SECONDS: float = 10.0
305306

307+
#
308+
# Async Task Configuration
309+
#
310+
311+
# Maximum time allowed for a single task to complete
312+
DEEPEVAL_PER_TASK_TIMEOUT_SECONDS: int = (
313+
300 # Set to float('inf') to disable timeout
314+
)
315+
316+
# Buffer time for gathering results from all tasks, added to the longest task duration
317+
# Increase if many tasks are running concurrently
318+
DEEPEVAL_TASK_GATHER_BUFFER_SECONDS: int = 60
319+
306320
##############
307321
# Validators #
308322
##############

deepeval/dataset/dataset.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
from asyncio import Task
2-
from typing import Iterator, List, Optional, Union, Literal
2+
from typing import TYPE_CHECKING, Iterator, List, Optional, Union, Literal
33
from dataclasses import dataclass, field
44
from opentelemetry.trace import Tracer
55
from opentelemetry.context import Context, attach, detach
66
from rich.console import Console
77
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
88
import json
99
import csv
10-
import webbrowser
1110
import os
1211
import datetime
1312
import time
@@ -17,6 +16,7 @@
1716

1817
from deepeval.confident.api import Api, Endpoints, HttpMethods
1918
from deepeval.dataset.utils import (
19+
coerce_to_task,
2020
convert_test_cases_to_goldens,
2121
convert_goldens_to_test_cases,
2222
convert_convo_goldens_to_convo_test_cases,
@@ -49,11 +49,18 @@
4949
from deepeval.test_run import (
5050
global_test_run_manager,
5151
)
52-
from deepeval.dataset.types import global_evaluation_tasks
5352
from deepeval.openai.utils import openai_test_case_pairs
5453
from deepeval.tracing import trace_manager
5554
from deepeval.tracing.tracing import EVAL_DUMMY_SPAN_NAME
5655

56+
if TYPE_CHECKING:
57+
from deepeval.evaluate.configs import (
58+
AsyncConfig,
59+
DisplayConfig,
60+
CacheConfig,
61+
ErrorConfig,
62+
)
63+
5764

5865
valid_file_types = ["csv", "json", "jsonl"]
5966

@@ -1230,7 +1237,7 @@ def evals_iterator(
12301237
)
12311238

12321239
def evaluate(self, task: Task):
1233-
global_evaluation_tasks.append(task)
1240+
coerce_to_task(task)
12341241

12351242
def _start_otel_test_run(self, tracer: Optional[Tracer] = None) -> Context:
12361243
_tracer = check_tracer(tracer)

deepeval/dataset/types.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
1-
class EvaluationTasks:
2-
tasks: list = []
1+
import asyncio
32

4-
def append(self, t):
5-
self.tasks.append(t)
3+
from typing import Any
4+
from deepeval.dataset.utils import coerce_to_task
65

7-
def get_tasks(self):
8-
return self.tasks
96

10-
def num_tasks(self):
11-
return len(self.tasks)
7+
class EvaluationTasks:
128

13-
def clear_tasks(self):
14-
self.tasks.clear()
9+
def __init__(self):
10+
self._tasks: list[asyncio.Future] = []
1511

12+
def append(self, obj: Any):
13+
self._tasks.append(coerce_to_task(obj))
14+
15+
def get_tasks(self) -> list[asyncio.Future]:
16+
return list(self._tasks)
17+
18+
def num_tasks(self):
19+
return len(self._tasks)
1620

17-
global_evaluation_tasks = EvaluationTasks()
21+
def clear_tasks(self) -> None:
22+
for t in self._tasks:
23+
if not t.done():
24+
t.cancel()
25+
self._tasks.clear()

deepeval/dataset/utils.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
from typing import List, Optional, Any
1+
import asyncio
2+
import inspect
23
import json
34
import re
45

6+
from typing import List, Optional, Any
57
from opentelemetry.trace import Tracer
6-
from opentelemetry import trace
7-
from opentelemetry.trace import NoOpTracerProvider
88

99
from deepeval.dataset.api import Golden
1010
from deepeval.dataset.golden import ConversationalGolden
@@ -174,3 +174,31 @@ def check_tracer(tracer: Optional[Tracer] = None) -> Tracer:
174174
)
175175

176176
return GLOBAL_TEST_RUN_TRACER
177+
178+
179+
def coerce_to_task(obj: Any) -> asyncio.Future[Any]:
180+
# already a Task so just return it
181+
if isinstance(obj, asyncio.Task):
182+
return obj
183+
184+
# If it is a future, it is already scheduled, so just return it
185+
if asyncio.isfuture(obj):
186+
# type: ignore[return-value] # it is an awaitable, gather accepts it
187+
return obj
188+
189+
# bare coroutine must be explicitly scheduled using create_task to bind to loop & track
190+
if asyncio.iscoroutine(obj):
191+
return asyncio.create_task(obj)
192+
193+
# generic awaitable (any object with __await__) will need to be wrapped so create_task accepts it
194+
if inspect.isawaitable(obj):
195+
196+
async def _wrap(awaitable):
197+
return await awaitable
198+
199+
return asyncio.create_task(_wrap(obj))
200+
201+
# not awaitable, so time to sound the alarm!
202+
raise TypeError(
203+
f"Expected Task/Future/coroutine/awaitable, got {type(obj).__name__}"
204+
)

0 commit comments

Comments
 (0)