The AtroposLib API is a FastAPI application designed to act as a central buffer and aggregator for reinforcement learning (RL) experience data. Its primary purpose is to decouple RL data generation (by "Rollout Handlers" or "Environments") from RL data consumption (by one or more "Trainers"), particularly in distributed online RL settings.
This service specifically handles the experience data pathway:
- Rollout Handlers connect and push trajectories (tokens, masks, scores, etc.).
- The API buffers this data in a queue.
- Trainers connect and pull processed batches of experience data for training updates.
Important: This service does not handle the distribution of updated policies from the Trainer back to the Rollout Handlers/Inference Servers. That part of the online RL loop is assumed to be handled by a separate mechanism.
- Centralized, in-memory queue for RL trajectory data.
- Registration endpoints for Trainers and Rollout Handlers.
- Serves batches of aggregated experience data to Trainers.
- Supports heterogeneous environments with weighting (via
/register-envweight and internal batching). - Minimum batch allocation support to guarantee certain environments contribute a minimum proportion to each batch.
- Provides status endpoints for monitoring queue size and training step count.
- Basic integration with Weights & Biases (W&B) project/group info.
- Endpoints for Rollout Handlers to disconnect gracefully.
- Debug endpoint to retrieve the latest submitted data sample.
This API typically sits within a larger RL system:
- Rollout Handlers: Instances simulating the environment. They interact with Inference Servers to get actions based on the current policy and send resulting trajectory data (
ScoredData) to this AtroposLib API (/scored_data). - Inference Servers (External): Serve the current policy (e.g., via an OpenAI-compatible API). Receive policy updates directly from the Trainer. Not part of this service.
- AtroposLib API (This Service): Buffers and batches experience data received from Rollout Handlers.
- Trainer(s): Pull batches of experience data from this API (
/batch), compute gradients, update the policy, and push updated policies directly to the Inference Servers.
with the repository installed we provide a helper script to run the server:
run-apiif you need more control over the server you can run it directly with:
uvicorn atroposlib.api.server:app --host 0.0.0.0 --port 8000 --reload--host 0.0.0.0: Makes the server accessible on your network.--port 8000: Specifies the port (change if needed).--reload: Enables auto-reloading on code changes (for development). Remove for production.
The API documentation (Swagger UI) will be available at http://<your-server-ip>:8000/docs.
GET /- Description: Root endpoint for basic health check.
- Response:
{"message": "AtroposLib API"}
POST /register- Description: Called once by the Trainer process to initialize the server state for a training run. Resets state if called again.
- Request Body:
Registrationmodelclass Registration(BaseModel): wandb_group: str wandb_project: str batch_size: int max_token_len: int # Max token length expected in trajectories checkpoint_dir: str # Shared location for checkpoints save_checkpoint_interval: int starting_step: int num_steps: int # Total expected training steps
- Response:
{"uuid": <generated_uuid_int>}
GET /wandb_info- Description: Retrieve W&B group and project info set during registration.
- Response:
{"group": <group_name_or_null>, "project": <project_name_or_null>}
GET /info- Description: Retrieve batch size and max token length set during registration.
- Response:
{"batch_size": <size_or_-1>, "max_token_len": <len_or_-1>}
GET /status- Description: Get the current training step (based on batches served) and queue size.
- Response:
{"current_step": <step_count>, "queue_size": <queue_length>}
POST /register-env- Description: Called by each Rollout Handler instance to register itself.
- Request Body:
RegisterEnvmodelclass RegisterEnv(BaseModel): max_token_length: int # Max length this env produces desired_name: str # Base name for identification/logging weight: float # Weight for sampling/batching (e.g., 1.0) group_size: int # Expected number of sequences per data submission min_batch_allocation: Optional[float] = None # Minimum proportion of batch (0.0-1.0)
- Response: Provides assigned ID, unique W&B name, checkpoint info.
{ "status": "success", "env_id": <assigned_env_id_int>, "wandb_name": <generated_unique_name>, "checkpoint_dir": <checkpoint_dir_from_registration>, "starting_step": <current_server_step>, "checkpoint_interval": <interval_from_registration>, "num_steps": <num_steps_from_registration> }
POST /disconnect-env- Description: Allows a Rollout Handler to signal it's disconnecting gracefully.
- Request Body:
EnvIdentifiermodel{"env_id": <registered_env_id_int>} - Response:
{"status": "success"}or{"status": "failure", "error": ...}
GET /status-env- Description: Called by a Rollout Handler to get general status plus its calculated sampling weight relative to other connected environments.
- Query Parameter: Requires
env: EnvIdentifiermodel (e.g.,?env_id=0- actual implementation might differ slightly, check FastAPI docs for query parameter models). Note: The code showsenv: EnvIdentifieras a body parameter for a GET request, which is non-standard. This might need adjustment or testing. Assuming it works via query or a POST instead. - Response:
{"current_step": <step>, "queue_size": <size>, "env_weight": <calculated_weight_float>}
POST /scored_data- Description: Endpoint for Rollout Handlers to push a single chunk of trajectory data.
- Request Body:
ScoredDatamodelclass ScoredData(BaseModel): tokens: List[List[int]] masks: List[List[int]] scores: List[float] advantages: Optional[List[List[float]]] = None ref_logprobs: Optional[List[List[float]]] = None inference_logprobs: Optional[List[List[float]]] = None generation_params: Optional[Dict[str, Any]] = None messages: Optional[List[List[Message]]] = None overrides: Optional[List[dict]] = None # Per-item logging overrides group_overrides: Optional[dict] = None # Group logging overrides images: Optional[Any] = None # Image data (if applicable) env_id: Optional[int] = None # ID of the environment that generated this data
- Expected Data Format:
tokens: Full unmasked token sequences (prompt + completion)masks: Token sequences for training with-100for prompt positions, actual token IDs for completion positionsinference_logprobs: Optional logprob sequences for training with1.0for masked positions (masked), actual logprob values for completion positions- Why 1.0 for masked logprobs? It represents an "obviously bad" probability (e^1.0 ≈ 2.718 > 1.0, invalid), making masked positions easy to identify during training
- Recommended: Use ManagedServer in your environment to automatically produce this format
- Response:
- Normal submission:
{"status": "received"} - Mixed-size group buffered:
{"status": "buffered", "buffer_size": <sequences_in_buffer>}
- Normal submission:
POST /scored_data_list- Description: Endpoint for Rollout Handlers to push a list of
ScoredDatachunks. - Request Body:
List[ScoredData] - Response:
{"status": "received", "groups_processed": <count>}
- Description: Endpoint for Rollout Handlers to push a list of
GET /batch- Description: Called by the Trainer to request a batch of data for training. The server uses internal logic to form a batch of the configured size from the available data in the queue. If any environments have minimum batch allocations specified, it uses
grab_batch_with_minimum_allocationsto ensure each environment gets at least its minimum proportion of the batch. Otherwise, it usesgrab_exact_from_heterogeneous_queueto form batches respecting environment weights. The server increments its internal step counter when a batch is successfully formed and returned. - Response:
- Success:
{"batch": [<data_item_1>, ..., <data_item_N>]}where eachdata_itemmatches the structure pushed via/scored_data. - Not enough data:
{"batch": null}
- Success:
- Description: Called by the Trainer to request a batch of data for training. The server uses internal logic to form a batch of the configured size from the available data in the queue. If any environments have minimum batch allocations specified, it uses
GET /latest_example- Description: Debug endpoint to retrieve the most recently added
ScoredDataitem. - Response: The last
ScoredDatadictionary pushed, or empty lists for tokens, masks, scores, advantages, ref_logprobs, inference_logprobs, generation_params, messages, and images if none yet.
- Description: Debug endpoint to retrieve the most recently added
GET /reset_data- Description: Warning: Resets all server state, including the queue, configuration, registered environments, and step count. Use with caution during development/debugging.
- Response: Plain text
Reset successfulwith HTTP status 200.
- Start Server: Launch the
AtroposLibAPI server. - Trainer Initialization: The main Trainer process sends a
POST /registerrequest with run parameters. - Rollout Handler Initialization: Each Rollout Handler starts and sends
POST /register-env. - Data Generation: Handlers run simulations, collect data, and send
POST /scored_dataorPOST /scored_data_listperiodically. - Training Loop:
- The Trainer (e.g., Rank 0 in distributed setup) enters a loop:
- Calls
GET /batch. - If
batchis notnull:- (Distribute batch to other ranks if applicable).
- Perform training step.
- Optionally call
GET /statusfor monitoring.
- If
batchisnull:- Wait briefly (
time.sleep) and retryGET /batch.
- Wait briefly (
- mermaid diagram of how a trainer interacts with the api is located here.
- Calls
- (In distributed setups, other ranks (1..N-1) might poll
GET /statusto wait for the step counter to increment before expecting the broadcasted batch from Rank 0). - The envs periodically poll
GET /status-envto check their status and sampling weight.- In asynchronous setups, they may stop at a maximum off-policy step count.
- mermaid diagram of how a rollout handler interacts with the api is located here.
- The Trainer (e.g., Rank 0 in distributed setup) enters a loop:
- Shutdown: Handlers may call
POST /disconnect-env.
The API supports ensuring minimum batch allocations for specific environments. This feature is useful when you want to guarantee that certain environments contribute at least a minimum proportion of sequences to each training batch.
-
Environment Registration: When registering an environment via
/register-env, you can specify:min_batch_allocation(Optional[float]): A value between 0.0 and 1.0 representing the minimum proportion of the batch this environment should contributegroup_size(int): The expected number of sequences per data submission from this environment
-
Batch Formation: When the trainer requests a batch via
/batch:- If any environment has a
min_batch_allocationspecified, the system uses special logic to ensure minimums are met - The system attempts to allocate at least
min_batch_allocation * batch_sizesequences from each environment with a minimum - If the sum of all minimum allocations exceeds 1.0, they are proportionally scaled down
- If an environment with a minimum allocation has no data available, the batch formation fails (returns null)
- If any environment has a
-
Mixed-Size Group Handling: When an environment submits data with a different number of sequences than its declared
group_size:- The data is buffered separately for that environment
- The system attempts to combine buffered groups to match the expected
group_size - Once combined, the data is added to the main queue
- Response includes
{"status": "buffered", "buffer_size": <sequences_in_buffer>}
# Environment 1: Requires at least 30% of each batch
{
"max_token_length": 512,
"desired_name": "critical_env",
"weight": 1.0,
"group_size": 4,
"min_batch_allocation": 0.3 # 30% minimum
}
# Environment 2: No minimum requirement
{
"max_token_length": 512,
"desired_name": "standard_env",
"weight": 1.0,
"group_size": 2,
"min_batch_allocation": None # No minimum
}- Minimum allocations are enforced per batch, not globally
- If minimum allocations cannot be satisfied (e.g., not enough data from a required environment), batch formation fails
- Environments without
min_batch_allocationfill the remaining batch space after minimums are satisfied - The feature respects heterogeneous packing constraints when forming batches
- In-Memory State: The primary limitation is that all queues, configurations, and states are stored in the FastAPI application's memory (
app.state).- No Persistence: Data is lost if the server restarts.
- Scalability Bottleneck: API cannot scale beyond a single server instance easily.