Skip to content

Conversation

@Darktex
Copy link
Contributor

@Darktex Darktex commented Jan 30, 2026

Summary

Convert EnvClient and all subclasses to async by default, per the design approved in Issue #342:

  • EnvClient = async (the new default)
  • SyncEnvClient = sync wrapper (only named because it's the exception)
  • .sync() method = returns SyncEnvClient to stop async infectiousness

New Class Hierarchy

EnvClient (async)                     # Refactored to use async websockets
    ├── GenericEnvClient              # Now async, same API
    └── MCPClientBase                 # Now async, same API
          └── MCPToolClient

SyncEnvClient (wrapper)               # New: wraps any EnvClient for sync usage
    └── .sync() method on EnvClient returns this

Changes

  • src/openenv/core/sync_client.py (NEW): SyncEnvClient wrapper with _run_async_safely() helper
  • src/openenv/core/env_client.py: Convert all methods to async, add .sync() method
  • src/openenv/core/mcp_client.py: Convert list_tools, call_tool, get_tool, has_tool to async
  • src/openenv/auto/auto_env.py: Use _run_async_safely() internally so AutoEnv.from_env() remains sync
  • Updated exports in __init__.py files

Usage

Async usage (new default):

async with GenericEnvClient(base_url="ws://localhost:8000") as env:
    result = await env.reset()
    result = await env.step({"code": "print('hello')"})

Sync wrapper:

env = GenericEnvClient(base_url="ws://localhost:8000").sync()
with env:
    result = env.reset()
    result = env.step({"code": "print('hello')"})

Test plan

  • All core tests pass (111 passed, 5 skipped)
  • Updated tests use pytest.mark.asyncio and AsyncMock where appropriate
  • Added tests for SyncEnvClient wrapper functionality
  • Added import tests for SyncEnvClient
  • Code formatted with ruff

Breaking Change

This is a breaking change as approved in Issue #342. Callers must either:

  1. Update to async/await syntax
  2. Use .sync() to get a SyncEnvClient wrapper

Closes #342

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Jan 30, 2026
@greptile-apps
Copy link

greptile-apps bot commented Jan 30, 2026

Greptile Overview

Greptile Summary

Converted EnvClient and all subclasses to async by default per approved design in Issue #342, with a SyncEnvClient wrapper for backwards compatibility via .sync() method.

Key Changes:

  • EnvClient now uses websockets.asyncio instead of websockets.sync, all methods (connect, reset, step, state, close) are async
  • SyncEnvClient wrapper class uses _run_async_safely() to handle async calls from both sync and async contexts (runs in thread pool if already in event loop)
  • MCPClientBase and MCPToolClient methods (list_tools, call_tool, get_tool, has_tool) converted to async
  • AutoEnv.from_env() remains sync by wrapping async calls with _run_async_safely()
  • Tests updated to use @pytest.mark.asyncio, AsyncMock, and async/await patterns
  • CI updated to install pytest-asyncio and configure pytest for async mode

Breaking Change: As documented, this is a breaking change requiring users to either adopt async/await or use .sync() wrapper. The sync __enter__ raises a helpful TypeError directing users to the correct usage.

Design Alignment: Changes follow Gymnasium-style API (reset/step/state signatures unchanged), maintain client-server separation, and do not expose any agent controls via MCP. The async conversion improves I/O efficiency while the sync wrapper prevents "async infection" for users who need synchronous APIs.

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk
  • Score reflects thorough implementation with comprehensive test coverage (111 tests passing), consistent async conversion across all client classes, proper use of asyncio.wait_for() for message timeouts (restoring functionality mentioned in previous thread), and excellent backwards compatibility via SyncEnvClient wrapper
  • No files require special attention

Important Files Changed

Filename Overview
src/openenv/core/sync_client.py New file implementing SyncEnvClient wrapper with _run_async_safely() helper for sync usage
src/openenv/core/env_client.py Converted all methods to async, added aenter/aexit, sync() method, helpful error on sync enter
src/openenv/core/mcp_client.py Converted list_tools, call_tool, get_tool, has_tool to async, updated docstrings
src/openenv/auto/auto_env.py Added _run_async_safely() to wrap from_env/from_docker_image calls, keeps AutoEnv.from_env() sync
tests/test_core/test_generic_client.py Updated tests with @pytest.mark.asyncio, AsyncMock, and async/await syntax

Sequence Diagram

sequenceDiagram
    participant User
    participant EnvClient
    participant SyncEnvClient
    participant WebSocket
    participant EnvServer

    Note over User,EnvServer: Async Usage Pattern
    User->>EnvClient: async with EnvClient(base_url)
    EnvClient->>EnvClient: __aenter__()
    EnvClient->>WebSocket: await ws_connect()
    WebSocket-->>EnvClient: connection established
    EnvClient-->>User: connected client

    User->>EnvClient: await reset()
    EnvClient->>WebSocket: await send({"type": "reset"})
    WebSocket->>EnvServer: reset message
    EnvServer-->>WebSocket: response
    WebSocket-->>EnvClient: await recv() with timeout
    EnvClient-->>User: StepResult

    User->>EnvClient: await step(action)
    EnvClient->>WebSocket: await send({"type": "step"})
    WebSocket->>EnvServer: step message
    EnvServer-->>WebSocket: response
    WebSocket-->>EnvClient: await recv() with timeout
    EnvClient-->>User: StepResult

    User->>EnvClient: __aexit__()
    EnvClient->>WebSocket: await close()

    Note over User,EnvServer: Sync Wrapper Pattern
    User->>EnvClient: client.sync()
    EnvClient->>SyncEnvClient: SyncEnvClient(self)
    SyncEnvClient-->>User: sync wrapper

    User->>SyncEnvClient: with sync_client
    SyncEnvClient->>SyncEnvClient: _run_async_safely()
    SyncEnvClient->>EnvClient: await connect()
    EnvClient->>WebSocket: await ws_connect()
    WebSocket-->>EnvClient: connection
    EnvClient-->>SyncEnvClient: result
    SyncEnvClient-->>User: connected

    User->>SyncEnvClient: reset()
    SyncEnvClient->>SyncEnvClient: _run_async_safely()
    SyncEnvClient->>EnvClient: await reset()
    EnvClient->>WebSocket: await send/recv
    WebSocket->>EnvServer: message
    EnvServer-->>WebSocket: response
    WebSocket-->>EnvClient: response
    EnvClient-->>SyncEnvClient: StepResult
    SyncEnvClient-->>User: StepResult
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

"""Receive and parse a message from the WebSocket."""
assert self._ws is not None
raw = self._ws.recv(timeout=self._message_timeout)
raw = await self._ws.recv()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message_timeout_s parameter is stored in self._message_timeout on line 109 but no longer used here. In the sync version, this was passed as timeout=self._message_timeout to recv(). Check if websockets.asyncio supports timeouts via asyncio.wait_for() or if this timeout should be removed from the constructor.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/openenv/core/env_client.py
Line: 186:186

Comment:
The `message_timeout_s` parameter is stored in `self._message_timeout` on line 109 but no longer used here. In the sync version, this was passed as `timeout=self._message_timeout` to `recv()`. Check if `websockets.asyncio` supports timeouts via `asyncio.wait_for()` or if this timeout should be removed from the constructor.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already fixed in fd24481 - the timeout is now used via asyncio.wait_for(self._ws.recv(), timeout=self._message_timeout) on line 187.

@Darktex
Copy link
Contributor Author

Darktex commented Jan 30, 2026

Addressed Greptile's feedback in 294f3f6:

Fixed the missing timeout on async recv() by adding asyncio.wait_for():

raw = await asyncio.wait_for(self._ws.recv(), timeout=self._message_timeout)

The message_timeout_s parameter is now properly applied to the async WebSocket receive operation.

@Darktex
Copy link
Contributor Author

Darktex commented Jan 30, 2026

@greptile

@Darktex Darktex changed the title Make EnvClient async by default with SyncEnvClient wrapper [1/n] Make EnvClient async by default with SyncEnvClient wrapper Jan 30, 2026
Convert EnvClient and all subclasses to async by default:
- EnvClient methods (connect, reset, step, state, close) are now async
- MCPToolClient methods (list_tools, call_tool, get_tool, has_tool) are now async
- Use `async with` for context manager, or `.sync()` for synchronous wrapper

Add SyncEnvClient wrapper class:
- Wraps any async EnvClient for synchronous usage
- Uses _run_async_safely() to handle both sync and async calling contexts
- Provides sync context manager support

Update AutoEnv to use _run_async_safely() internally so it remains
synchronous for callers while supporting the new async client methods.

This is a breaking change - callers must either:
1. Update to async/await syntax
2. Use .sync() to get a SyncEnvClient wrapper

Closes #342
The async client tests require pytest-asyncio to run properly.
Added pytest-asyncio to the CI workflow dependencies and configured
asyncio_mode=auto in pyproject.toml for automatic async test detection.
Update test_dipg_client.py to use async/await since EnvClient.reset()
is now an async method.
Use asyncio.wait_for() to apply the message_timeout_s parameter
to the async recv() call, preserving the timeout behavior from the
sync implementation.

Addresses Greptile review feedback.
Implements the Rubric abstraction for reward computation as specified in RFC 004:

- Base Rubric class with forward hooks, child registration, and serialization
- Container rubrics: Sequential, Gate, WeightedSum, RubricList, RubricDict
- Trajectory rubrics: TrajectoryRubric, ExponentialDiscountingTrajectoryRubric
- Environment integration: rubric attribute, _apply_rubric(), _reset_rubric()

This provides a composable, PyTorch-like API for defining reward signals that can
be introspected by training infrastructure.

86 tests covering all rubric functionality.
- Rubric.forward() now supports both sync and async implementations
- Base class auto-detects async forward() and awaits appropriately
- Container rubrics (Sequential, Gate, WeightedSum) handle async children
- WeightedSum executes async children in parallel via asyncio.gather()
- Add _apply_rubric_async() and _reset_rubric_async() to Environment
- Fix pre-hooks to run BEFORE forward() (was incorrectly running after)
- Fix Sequential double-call bug when async detected mid-iteration
- Update RFC 004 to document async-first design
- Add 63 new tests for async rubrics and bug regression
Copy link
Collaborator

@burtenshaw burtenshaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just two small changes to clarify.

Also, maybe we should add an agent skill reference md on migrating changes relating to this pr specifically.

ObsT = TypeVar("ObsT")


def _run_async_safely(coro):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is repeated right? Should we move it to a utils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 1430a51 - extracted run_async_safely to src/openenv/core/utils.py and updated all 3 files that were duplicating this pattern.

if inspect.iscoroutinefunction(self.forward):
# Async path - pre-hooks will be called in _call_async
result = self.forward(action, observation)
return self._call_async(action, observation, result)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call forward in self._call_async because for async rubrics, self.forward is called on line 71 before pre-hooks run (line 92-96 in _call_async). If a pre-hook modifies instance state that forward() reads, the coroutine was already bound with old values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! This was indeed a bug. Fixed in 4e136a2 - we now use inspect.iscoroutinefunction(self.forward) to detect async BEFORE calling, so pre-hooks run first. Added regression tests in test_pre_hook_bugs.py to prevent this from regressing.

Addresses review feedback from @burtenshaw to consolidate
duplicated _run_async_safely function into a shared utility.

- Add run_async_safely() to src/openenv/core/utils.py
- Update sync_client.py, auto_env.py, mcp_environment.py to use it
- Net reduction of 57 lines of duplicated code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BREAKING CHANGES] Implement async in Core, migrate envs to zoo, migrate to MCP server by default

3 participants