Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion examples/03_parallel_eval_with_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class Args:
num_parallel_workers: int = 20
# If None, run on all tasks. Otherwise, limit to `num_tasks` tasks.
num_tasks: int | None = None
# Maximum wall-clock time per task in seconds. Tasks that exceed this limit are
# cancelled and reported as errors. Prevents hung container execs from blocking
# semaphore slots indefinitely.
task_timeout_s: float = 30 * 60 # 30 minutes


async def evaluate_task(
Expand Down Expand Up @@ -131,7 +135,10 @@ async def _await_with_semaphore(
task: Awaitable[ares.TimeStep[Any, float, float]],
) -> ares.TimeStep[Any, float, float]:
async with sem:
return await task
# Wrap each task with a wall-clock timeout. This is the outermost safeguard against
# hung container exec calls that bypass per-operation timeouts (e.g., when the
# underlying container runtime doesn't properly propagate asyncio cancellation).
return await asyncio.wait_for(task, timeout=args.task_timeout_s)

tasks = [
_await_with_semaphore(evaluate_task(args.preset_name, task_idx, agent, container_factory, dashboard))
Expand Down
11 changes: 7 additions & 4 deletions src/ares/code_agents/mini_swe_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,13 @@ async def run(self, task: str) -> None:
with self.tracker.timeit("mswea/setup"):
_LOGGER.debug("[%d] Starting mini-swe-agent run.", id(self))

system = (await self.container.exec_run("uname -a", env=self._environment_env_vars)).output.strip()
release = (await self.container.exec_run("uname -r", env=self._environment_env_vars)).output.strip()
version = (await self.container.exec_run("uname -v", env=self._environment_env_vars)).output.strip()
machine = (await self.container.exec_run("uname -m", env=self._environment_env_vars)).output.strip()
# Use a short timeout for system info commands - these should complete quickly.
setup_timeout_s = 30
env = self._environment_env_vars
system = (await self.container.exec_run("uname -a", timeout_s=setup_timeout_s, env=env)).output.strip()
release = (await self.container.exec_run("uname -r", timeout_s=setup_timeout_s, env=env)).output.strip()
version = (await self.container.exec_run("uname -v", timeout_s=setup_timeout_s, env=env)).output.strip()
machine = (await self.container.exec_run("uname -m", timeout_s=setup_timeout_s, env=env)).output.strip()

_LOGGER.debug("[%d] System information: %s %s %s %s", id(self), system, release, version, machine)

Expand Down
30 changes: 26 additions & 4 deletions src/ares/environments/code_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ async def step(self, action: response.LLMResponse) -> base.TimeStep[request.LLMR

return ts

# Maximum time to wait for the code agent to make an LLM request or complete.
# This is a backstop against cases where individual operation timeouts are bypassed
# (e.g., if the underlying container runtime doesn't properly propagate cancellation).
_GET_TIME_STEP_TIMEOUT_S = 600 # 10 minutes

async def _get_time_step(
self,
) -> base.TimeStep[request.LLMRequest | None, float, float]:
Expand All @@ -168,7 +173,21 @@ async def _get_time_step(
with self._tracker.timeit(f"{self._prefix}/get_from_queue"):
get_from_queue_task = asyncio.create_task(self._llm_client.q.get())
tasks = [self._code_agent_task, get_from_queue_task]
done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED, timeout=self._GET_TIME_STEP_TIMEOUT_S
)

if not done:
# Neither the code agent nor the LLM request queue responded within the timeout.
# Cancel both tasks to prevent resource leaks.
for task in pending:
task.cancel()
raise TimeoutError(
f"[{id(self)}] Timed out waiting for code agent to make an LLM request or complete "
f"(timeout={self._GET_TIME_STEP_TIMEOUT_S}s). "
"This may indicate a hung container exec call or a non-cancellable async operation."
)

_LOGGER.debug("[%d] Code agent or LLM request completed.", id(self))

if self._code_agent_task in done:
Expand Down Expand Up @@ -274,6 +293,9 @@ async def _start_code_agent(self) -> None:
self._code_agent_task = asyncio.create_task(self._code_agent.run(self._current_task.instruction))
_LOGGER.debug("[%d] Code agent started.", id(self))

# Timeout for the full test evaluation suite. Tests can be slow, but must not hang forever.
_REWARD_EXEC_TIMEOUT_S = 300 # 5 minutes

async def _compute_reward(self) -> float:
"""Run tests and compute the reward for the current episode."""
if self._container is None:
Expand All @@ -289,14 +311,14 @@ async def _compute_reward(self) -> float:

_LOGGER.debug("[%d] Creating verifier directory", id(self))
verifier_dir = str(harbor_paths.EnvironmentPaths.verifier_dir)
await self._container.exec_run(command=f"mkdir -p {verifier_dir}")
await self._container.exec_run(command=f"mkdir -p {verifier_dir}", timeout_s=self._REWARD_EXEC_TIMEOUT_S)

_LOGGER.debug("[%d] Running tests and evaluating.", id(self))
test_path = str(
pathlib.Path("/tests") / self._current_task.paths.test_path.relative_to(self._current_task.paths.tests_dir)
)
# TODO: Log the output of the test execution somewhere that makes sense
test_result = await self._container.exec_run(command=f"bash {test_path}")
test_result = await self._container.exec_run(command=f"bash {test_path}", timeout_s=self._REWARD_EXEC_TIMEOUT_S)
_LOGGER.debug("[%d] Test result: %s.", id(self), test_result.output)

# Try to read reward from both
Expand All @@ -322,7 +344,7 @@ async def _parse_reward_file(self, remote_path: pathlib.Path | str) -> float | N
raise RuntimeError("Container has not been created before parsing reward file.")

remote_path = str(remote_path)
cat_result = await self._container.exec_run(command=f"cat {remote_path}")
cat_result = await self._container.exec_run(command=f"cat {remote_path}", timeout_s=self._REWARD_EXEC_TIMEOUT_S)
if cat_result.exit_code != 0:
# File doesn't exist
return None
Expand Down
Loading