Skip to content

Commit d8d3491

Browse files
committed
[reward] feat: Add warning for ignored rate limits after initialization
- Introduced a warning mechanism in RateLimitedRewardManager to alert users when attempts are made to change global RPM/TPM settings after the class has been initialized with default values. - Updated the constructor to ensure that the warning is logged if new configurations are ignored due to prior initialization. - Enhanced test coverage to verify the warning behavior when changing rate limits post-initialization.
1 parent c03cc26 commit d8d3491

File tree

2 files changed

+63
-5
lines changed

2 files changed

+63
-5
lines changed

tests/experimental/reward_loop/test_rate_limited_reward_manager_on_cpu.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import logging
1617
import time
1718

1819
import pytest
@@ -469,6 +470,32 @@ async def test_class_initialization_once(self, tokenizer):
469470
# Should be the same object
470471
assert first_semaphore is second_semaphore
471472

473+
def test_warn_when_rate_limits_are_ignored_due_to_prior_init(self, tokenizer, caplog):
474+
"""Warn when a new config attempts to change global RPM/TPM after the class has been initialized."""
475+
caplog.set_level(logging.WARNING)
476+
477+
# First instantiation without a config (legacy signature) initializes global limiters with defaults.
478+
_ = RateLimitedRewardManager(
479+
tokenizer=tokenizer,
480+
compute_score=mock_async_reward_function,
481+
num_examine=0,
482+
reward_fn_key="data_source",
483+
)
484+
485+
# Second instantiation attempts to set RPM limits, but will be ignored due to global initialization.
486+
config = DictConfig({"reward_model": {"max_concurrent": 10, "max_rpm": 60, "timeout": 10.0}})
487+
_ = RateLimitedRewardManager(
488+
config=config,
489+
tokenizer=tokenizer,
490+
compute_score=mock_async_reward_function,
491+
)
492+
493+
assert any(
494+
"RateLimitedRewardManager has already been initialized" in record.getMessage()
495+
and "ignored" in record.getMessage()
496+
for record in caplog.records
497+
), "Expected a warning when attempting to change global rate limits after initialization."
498+
472499
@pytest.mark.asyncio
473500
async def test_extra_info_handling(self, tokenizer):
474501
"""Test that extra_info is properly passed to reward function."""

verl/experimental/reward_loop/reward_manager/limited.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,27 +264,58 @@ class RateLimitedRewardManager(RewardManagerBase):
264264
@classmethod
265265
def init_class(cls, config: DictConfig, tokenizer: AutoTokenizer):
266266
"""Initialize class state shared across all instances."""
267-
# Check if already initialized before calling parent
267+
# Check if already initialized before calling parent.
268+
#
269+
# NOTE: This class owns a *global*, class-level set of rate limiters. Once the class has been
270+
# initialized, subsequent instantiations cannot change the shared limiters. This is by design,
271+
# but it can be surprising (and dangerous) when the first initialization happens with default
272+
# values (often "unlimited") and later code tries to apply limits.
268273
if cls._class_initialized:
274+
rm_cfg = config.get("reward_model") or {}
275+
incoming_max_rpm = rm_cfg.get("max_rpm", None)
276+
incoming_max_tpm = rm_cfg.get("max_tpm", None)
277+
278+
# Warn when a caller is trying to change the global RPM/TPM limits after initialization.
279+
# This commonly happens if the first instance was created without a config (legacy signature),
280+
# which initializes the global limiters to their defaults and locks them in.
281+
if (incoming_max_rpm != cls._max_rpm) or (incoming_max_tpm != cls._max_tpm):
282+
if (
283+
incoming_max_rpm is not None
284+
or incoming_max_tpm is not None
285+
or cls._max_rpm is not None
286+
or cls._max_tpm is not None
287+
):
288+
logger.warning(
289+
"RateLimitedRewardManager has already been initialized and its rate limiters are shared "
290+
"globally across instances. The incoming (max_rpm/max_tpm) settings will be ignored. "
291+
"This can lead to unexpected behavior (e.g., exceeding API rate limits) if the first "
292+
"initialization used defaults (often unlimited). "
293+
f"Existing: max_rpm={cls._max_rpm}, max_tpm={cls._max_tpm}. "
294+
f"Incoming: max_rpm={incoming_max_rpm}, max_tpm={incoming_max_tpm}. "
295+
"To apply different limits, ensure the first RateLimitedRewardManager created in this "
296+
"process uses the desired configuration (or restart/reset the process)."
297+
)
269298
return
270299

271300
super().init_class(config, tokenizer)
272301

302+
rm_cfg = config.get("reward_model") or {}
303+
273304
# Concurrency limiter
274-
cls._max_concurrent = config.reward_model.get("max_concurrent", 1)
305+
cls._max_concurrent = rm_cfg.get("max_concurrent", 1)
275306
cls._semaphore = asyncio.Semaphore(cls._max_concurrent)
276307

277308
# Request rate limiter (RPM)
278-
cls._max_rpm = config.reward_model.get("max_rpm", None)
309+
cls._max_rpm = rm_cfg.get("max_rpm", None)
279310
if cls._max_rpm is not None:
280311
requests_per_second = cls._max_rpm / 60.0
281312
cls._rpm_limiter = AsyncTokenBucket(rate_limit=requests_per_second, max_tokens=requests_per_second)
282313
else:
283314
cls._rpm_limiter = None
284315

285316
# Token rate limiter (TPM)
286-
cls._max_tpm = config.reward_model.get("max_tpm", None)
287-
cls._estimated_tokens_per_request = config.reward_model.get("estimated_tokens_per_request", 2000)
317+
cls._max_tpm = rm_cfg.get("max_tpm", None)
318+
cls._estimated_tokens_per_request = rm_cfg.get("estimated_tokens_per_request", 2000)
288319
if cls._max_tpm is not None:
289320
tokens_per_second = cls._max_tpm / 60.0
290321
cls._tpm_limiter = AsyncTokenBucket(rate_limit=tokens_per_second, max_tokens=tokens_per_second)

0 commit comments

Comments
 (0)