Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ Improve pass:

- `TrainableItem` should not pickle / serialize tokenizer instances; drop them in `__getstate__`
- `model_name` often includes `exp_id`; persist a template like `deployed-model-{EXP_ID}` so resume survives `exp_id` changes
- A temporary synthetic-latency harness was useful for proving that fully-async can outperform one-step-off in a balanced-base regime, but that experiment-only sleep code has been removed from the shared RLVR files. Keep future synthetic-latency instrumentation outside the main experiment code unless it is intended to stay.
- In `steptronoss/core/generators/flow_controller_simulator.py`, `max_concurrent` now means real concurrent prompt progress for every strategy: simple strategies use it as a per-block infer slot cap, and `fully-async` advances every running prompt each tick up to that cap.
- `steptronoss/core/generators/flow_controller_simulator.py` now applies `max_concurrent` consistently across strategies. For simple strategies, an explicit `max_concurrent` caps per-block infer concurrency; for `fully-async`, every running prompt advances each tick up to that concurrency cap (not the old single-prompt RR service bug).
- For short RLVR timing A/Bs (`train_iters` around 7), `fully-async` can look worse even when its simulator steady state is better, because the final drain/tail batch dominates the average. For flow-policy comparisons, prefer more iterations or compare middle steady-state steps separately from warmup/drain.
- `steptronoss/core/generators/flow_controller.py` runs rollout blocks sequentially on the inference side but can overlap the next block with trainer compute. Within a block, prompts are submitted concurrently and block duration is effectively bounded by the slowest prompt.
- `SimpleFlowController` version scheduling is `0, 1, 2, ...` for `on-policy` and `0, 0, 1, 2, ...` for `one-step-off`; after warm-up, `one-step-off` typically trains iter `k` on rollouts generated by actor version `k-1`.
- `FlowControllerConfig` is now the simple-controller base for `on-policy` / `one-step-off`, while `FullyAsyncFlowControllerConfig` carries `fully-async`-specific knobs (`max_untrained_prompts`, `max_staleness`) and dispatches to `FullyAsyncFlowController`. The current fully-async implementation is a first pass: it schedules prompts with backpressure, gates version bumps by staleness, and yields once `pre_train` has `prompt_per_iter` prompt results, but it should still be validated against the simulator before behavior-changing edits.
- `steptronoss/core/generators/flow_controller_simulator.py` is now the safest place to reason about new fully-async rollout rules first. Its `fully-async` path models `prompt_per_iter`, `max_untrained_prompts`, `max_staleness`, and an explicit `max_concurrent` concurrent-prompt scheduler before the real controller exists.

## 8. Optimization Guidance

Expand Down
46 changes: 46 additions & 0 deletions playground/rlvr/qwen3_1p5b_rlvr_math_fully_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Fully-async RLVR variant for flow-controller A/B experiments.

Historical note
---------------
Before removing the temporary synthetic-latency harness from the shared RLVR
experiment code, a balanced-base stress test showed that fully-async can beat
one-step-off once the remaining variance is concentrated in a stronger long
tail. In that harness, `prompt_per_iter=10`, infer/train were balanced, and
the fully-async point `max_untrained_prompts=22`, `max_staleness=2`,
`max_concurrent_genables=10` improved iteration time by roughly:

- `~5-8%` under the milder long-tail setting
- `~20-23%` after further increasing the long-tail strength

Those numbers are retained here as design context for future fully-async A/B
work, but the temporary synthetic-sleep code used to produce them has been
removed from the main RLVR experiment files.
"""

from playground.rlvr.qwen3_1p5b_rlvr_math import Exp as BaseExp
from playground.rlvr.qwen3_1p5b_rlvr_math import RLVRTrainerConfig
from steptronoss.exp.rl import FullyAsyncFlowControllerConfig


class FullyAsyncMathFlowControllerConfig(FullyAsyncFlowControllerConfig):
def __init__(self):
super().__init__()
self.prompt_per_iter = 16
self.max_untrained_prompts = 32
self.max_staleness = 2


class FullyAsyncRLVRTrainerConfig(RLVRTrainerConfig):
flow_cfg = FullyAsyncMathFlowControllerConfig


class Exp(BaseExp):
trainer_cfg: FullyAsyncRLVRTrainerConfig = FullyAsyncRLVRTrainerConfig

def __init__(self):
super().__init__()
self.checkpoint_cfg.save_path = "/oss/checkpoints/qwen3_1p5b_rlvr_math_fully_async"


if __name__ == "__main__":
Exp().entrypoint()
200 changes: 200 additions & 0 deletions steptronoss/core/generators/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Generators

This directory currently contains the PPO/RL rollout flow controller:
`steptronoss/core/generators/flow_controller.py`.

## What `flow_controller.py` does

`SimpleFlowController` is the rank-0 rollout orchestration layer between:

- the prompt source (`TrainerPromptStream` / dataloader),
- asynchronous generation (`GenerationController`),
- the hot-reloaded inference model (`vLLM`),
- and PPO training (`PPOTrainer`).

From the trainer's perspective, the interface is intentionally small:

- `start(...)`: attach the prompt source and actor model, then start background workers
- `get_train_samples()`: return all rollout trajectories for exactly one train iteration
- `state_dict()` / `_load_state_dict(...)`: persist and restore in-flight flow state

The trainer call path is:

`PPOTrainer.start()` -> `flow_controller.start(...)` -> background scheduling/generation
threads -> `PPOTrainer.generate_trajectory()` -> `flow_controller.get_train_samples()`.

## Runtime model

`SimpleFlowController` builds a checkpointable `PersistentFlow` with three stages:

- `source`: prompt source backed by `TrainerPromptStream`
- `pre_gen`: prompts and control signals waiting for generation
- `pre_train`: generated trajectories waiting to be consumed by training

Two daemon threads run on rank 0:

- `_control_worker()`: schedules `need_version -> prompts -> train` blocks into `pre_gen`
- `_generation_worker()`: executes those blocks, waits for the right inference
weights, launches async generation, and pushes completed trajectories into
`pre_train`

## On-policy timing

The most important behavior is that `get_train_samples()` is both:

- a request for the next batch of rollout trajectories, and
- a synchronization point that hot-reloads the latest actor weights into vLLM

For the default `on-policy` mode, one train iteration looks like this:

```mermaid
sequenceDiagram
participant T as PPOTrainer
participant FC as SimpleFlowController
participant CW as _control_worker
participant F as PersistentFlow
participant GW as _generation_worker
participant GC as GenerationController
participant V as vLLM
participant S as TrainerPromptStream

Note over FC: start() creates PersistentFlow and launches both workers on rank 0

loop schedule future rollout blocks
CW->>F: put need_version(v)
loop prompt_per_iter times
CW->>S: pop next prompt
S-->>CW: GenableItem
CW->>F: put prompt
end
CW->>F: put train barrier
end

T->>FC: get_train_samples()
FC->>FC: train_weight_version += 1
FC->>V: deploy_training_model(actor.models)
Note over FC,V: infer_weight_version = train_weight_version

loop drain one rollout block
GW->>F: get next item from pre_gen
alt need_version(v)
GW->>GW: wait until infer_weight_version >= v
else prompt
GW->>GC: submit_with_callback(prompt, for_train=True)
GC->>V: generate_for_train()
V-->>GC: list[EnvTrajectory]
GC-->>GW: callback(result)
GW->>F: put trajectories into pre_train
GW->>F: ack prompt in pre_gen
else train barrier
GW->>GW: wait until all pending prompts finish
GW->>F: put train signal into pre_train
GW->>F: ack train barrier and version request
end
FC->>F: get next item from pre_train
alt trajectories
FC->>FC: assign one shared prompt_id per prompt group
else train signal
FC-->>T: return trajectories for this train iter
end
end
```

## One-step-off timing

`one-step-off` uses the same machinery, but the scheduler warm-starts with two
consecutive rollout blocks that both require inference weight version `0`.

That means:

- iter 0 still trains on rollouts generated by version `0`
- after warm-up, iter `k` usually trains on rollouts generated by version `k-1`
- meanwhile vLLM can already be hot-reloaded to version `k` before those stale
rollouts are consumed

```mermaid
sequenceDiagram
participant T0 as PPOTrainer iter 0
participant T1 as PPOTrainer iter 1
participant FC as SimpleFlowController
participant F as PersistentFlow
participant GW as _generation_worker
participant V as vLLM

Note over FC,F: Warm start in one-step-off mode
FC->>F: schedule Block A = need_version(0), prompts..., train
FC->>F: schedule Block B = need_version(0), prompts..., train

T0->>FC: get_train_samples()
FC->>V: sync actor to version 0
GW->>F: run Block A under version 0
GW->>F: run Block B under version 0 (may overlap)
FC-->>T0: return Block A rollouts

Note over T0,FC: Trainer updates actor during iter 0

T1->>FC: get_train_samples()
FC->>V: sync actor to version 1
FC-->>T1: return Block B rollouts
Note over T1: Iter 1 consumes rollouts generated by version 0

FC->>F: future blocks continue as need_version(1), need_version(2), ...
```

## Scheduling semantics

The controller uses explicit weight versions to define how fresh inference must
be before a prompt can run.

- `train_weight_version`: latest training-side actor version
- `infer_weight_version`: latest version already hot-reloaded into vLLM
- `need_version(v)`: generation barrier meaning "do not run later prompts until
vLLM has at least version `v`"

Current strategies in `SimpleFlowController`:

- `on-policy`: scheduled version sequence is `0, 1, 2, ...`
- `one-step-off`: scheduled version sequence is `0, 0, 1, 2, ...`

In practice:

- `on-policy`: iter `k` trains on rollouts generated by version `k`
- `one-step-off`: after the initial warm-up, iter `k` trains on rollouts
generated by version `k-1`

That is why `one-step-off` can keep one extra rollout block in flight using the
previous actor weights, while `on-policy` advances one block per weight version.

## Checkpoint / resume semantics

The flow controller is intentionally part of trainer checkpointing.

- `state_dict()` stores:
- `train_weight_version`
- `infer_weight_version`
- the whole `PersistentFlow` queue state
- `PersistentQueue.state_dict()` includes both queued items and already-popped
but not-yet-acked items, so resumes keep unfinished work instead of silently
dropping it
- `weight_dumped()` only acks the `pre_train` items that were already handed to
the trainer after checkpoint dumping succeeds
- `_load_state_dict(...)` resets `infer_weight_version` to `-1` on restore, so
resume conservatively re-syncs weights to inference

## Important current limitation

`FlowControllerConfig` covers the simple `on-policy` / `one-step-off` modes.
`FullyAsyncFlowControllerConfig` owns the `fully-async`-specific knobs such as
`max_untrained_prompts` and `max_staleness`. The repo now has an initial
`FullyAsyncFlowController` implementation for that mode. It already does prompt
scheduling, async generation, version-gated weight updates, and
prompt-count-based yielding, but it is still early and should be treated as a
first pass guided by the simulator rather than a battle-tested production path.

If you extend this module, review these pieces together:

- `sync_weight()` and `VLLMDeployConfig.deploy_training_model(...)`
- `_control_worker()` scheduling and version bookkeeping
- `_generation_worker()` barrier handling
- `PersistentFlow` ack/restore behavior
- checkpoint call sites in `PPOTrainer.save_checkpoint()`
Loading
Loading