Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
fe0fd0e
more
fzyzcjy Jan 1, 2026
689008f
more
fzyzcjy Jan 1, 2026
372eec4
more
fzyzcjy Jan 1, 2026
3e86e19
more
fzyzcjy Jan 1, 2026
dafba0c
more
fzyzcjy Jan 1, 2026
8b39b71
more
fzyzcjy Jan 1, 2026
e40d02b
more
fzyzcjy Jan 1, 2026
78b2e0f
more
fzyzcjy Jan 1, 2026
f31b423
more
fzyzcjy Jan 1, 2026
7842218
more
fzyzcjy Jan 1, 2026
538cfee
more
fzyzcjy Jan 1, 2026
9823c57
more
fzyzcjy Jan 1, 2026
bfa32ed
more
fzyzcjy Jan 1, 2026
d9e3cf3
more
fzyzcjy Jan 1, 2026
39e8858
temp
fzyzcjy Jan 1, 2026
e8152e8
more
fzyzcjy Jan 1, 2026
3990e55
more
fzyzcjy Jan 1, 2026
60ea625
print
fzyzcjy Jan 1, 2026
c933872
Revert "temp"
fzyzcjy Jan 1, 2026
165dbc4
Revert "print"
fzyzcjy Jan 1, 2026
5553b02
more
fzyzcjy Jan 1, 2026
c18abf2
fmt
fzyzcjy Jan 1, 2026
3283c84
more
fzyzcjy Jan 1, 2026
30c7529
more
fzyzcjy Jan 1, 2026
3f8cdeb
more
fzyzcjy Jan 1, 2026
899110d
more
fzyzcjy Jan 1, 2026
9a0f670
more
fzyzcjy Jan 1, 2026
ca73c8b
more
fzyzcjy Jan 1, 2026
f2029d8
more
fzyzcjy Jan 1, 2026
9b056f0
more
fzyzcjy Jan 1, 2026
f21b36e
more
fzyzcjy Jan 1, 2026
892905b
more
fzyzcjy Jan 1, 2026
11cf924
more
fzyzcjy Jan 1, 2026
43a3b87
more
fzyzcjy Jan 1, 2026
88a517f
more
fzyzcjy Jan 1, 2026
893174a
temp print
fzyzcjy Jan 1, 2026
ad4f45a
more
fzyzcjy Jan 1, 2026
e37e276
Revert "temp print"
fzyzcjy Jan 1, 2026
57e3718
more
fzyzcjy Jan 1, 2026
9db3dc2
more
fzyzcjy Jan 1, 2026
b777cce
more
fzyzcjy Jan 1, 2026
4f5a5a9
more
fzyzcjy Jan 1, 2026
362bee4
more
fzyzcjy Jan 1, 2026
a5e8f14
more
fzyzcjy Jan 1, 2026
1f95a87
fmt
fzyzcjy Jan 1, 2026
20c93a9
more
fzyzcjy Jan 1, 2026
ed44a5d
more
fzyzcjy Jan 1, 2026
6500cd3
more
fzyzcjy Jan 1, 2026
3616f35
more
fzyzcjy Jan 1, 2026
70a4ad4
more
fzyzcjy Jan 1, 2026
652508c
Revert "more"
fzyzcjy Jan 1, 2026
82dc039
print
fzyzcjy Jan 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/sglang/srt/environ.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class Envs:
SGLANG_DYNAMIC_CHUNKING_SMOOTH_FACTOR = EnvFloat(0.75)
SGLANG_SCHEDULER_SKIP_ALL_GATHER = EnvBool(False)
SGLANG_SCHEDULER_DECREASE_PREFILL_IDLE = EnvBool(False)
SGLANG_PREFILL_DELAYER_MAX_DELAY_PASSES = EnvInt(30)
SGLANG_DATA_PARALLEL_BUDGET_INTERVAL = EnvInt(1)

# Test: pd-disaggregation
Expand Down
66 changes: 66 additions & 0 deletions python/sglang/srt/managers/prefill_delayer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import torch

from sglang.srt.environ import envs


class PrefillDelayer:
def __init__(self, dp_size, attn_tp_size, tp_worker, server_args):
self.global_info = torch.empty(
(dp_size, attn_tp_size, 2),
dtype=torch.int64,
device="cpu",
)
self.cpu_group = tp_worker.get_tp_group().cpu_group

self.curr_delayed_count = 0
self.max_delay_passes = envs.SGLANG_PREFILL_DELAYER_MAX_DELAY_PASSES.get()

assert (
server_args.schedule_policy == "fcfs"
), f"To use SCHEDULER_DECREASE_PREFILL_IDLE, schedule_policy must be 'fcfs'. '{server_args.schedule_policy}' is not supported."
assert (
server_args.enable_dp_attention
), "To use SCHEDULER_DECREASE_PREFILL_IDLE, enable_dp_attention must be enabled."
assert (
server_args.disaggregation_mode == "null"
), "To use SCHEDULER_DECREASE_PREFILL_IDLE, disaggregation_mode must be null."
assert (
not server_args.disable_overlap_schedule
), "To use SCHEDULER_DECREASE_PREFILL_IDLE, disable_overlap_schedule must be False."

def _gather_info(self, local_can_prefill: int, local_is_idle: bool):
local_info = torch.tensor(
[local_can_prefill, int(local_is_idle)],
device="cpu",
dtype=torch.int64,
)
torch.distributed.all_gather_into_tensor(
self.global_info.flatten(),
local_info,
group=self.cpu_group,
)
tp0_info = self.global_info[:, 0, :]
return tp0_info

def should_allow_prefill(self, local_can_prefill: int, local_is_idle: bool) -> bool:
tp0_info = self._gather_info(
local_can_prefill=local_can_prefill, local_is_idle=local_is_idle
)
global_can_prefill = tp0_info[:, 0]
global_exists_cannot_prefill = global_can_prefill.min().item() == 0
global_exists_can_prefill = global_can_prefill.max().item() > 0
global_exists_idle = bool(tp0_info[:, 1].max().item())

if (
(not global_exists_idle)
and global_exists_cannot_prefill
and global_exists_can_prefill
):
self.curr_delayed_count += 1
if self.curr_delayed_count < self.max_delay_passes:
print("hi branch: delay prefill")
return False
print("hi branch: cannot delay prefill since timeout")

self.curr_delayed_count = 0
return True
42 changes: 22 additions & 20 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
)
from sglang.srt.managers.mm_utils import init_mm_embedding_cache
from sglang.srt.managers.overlap_utils import FutureMap
from sglang.srt.managers.prefill_delayer import PrefillDelayer
from sglang.srt.managers.schedule_batch import (
FINISH_ABORT,
ModelWorkerBatch,
Expand All @@ -133,7 +134,6 @@
SchedulePolicy,
)
from sglang.srt.managers.scheduler_dp_attn_mixin import SchedulerDPAttnMixin
from sglang.srt.managers.scheduler_enhancer import SchedulerEnhancer
from sglang.srt.managers.scheduler_input_blocker import SchedulerInputBlocker
from sglang.srt.managers.scheduler_metrics_mixin import (
RECORD_STEP_TIME,
Expand Down Expand Up @@ -203,7 +203,6 @@
TEST_RETRACT = envs.SGLANG_TEST_RETRACT.get()
TEST_RETRACT_INTERVAL = envs.SGLANG_TEST_RETRACT_INTERVAL.get()
TEST_RETRACT_NO_PREFILL_BS = envs.SGLANG_TEST_RETRACT_NO_PREFILL_BS.get()
SCHEDULER_DECREASE_PREFILL_IDLE = envs.SGLANG_SCHEDULER_DECREASE_PREFILL_IDLE.get()
GRAMMAR_TIMEOUT = float(os.environ.get("SGLANG_GRAMMAR_TIMEOUT", 300))


Expand Down Expand Up @@ -793,14 +792,13 @@ def init_schedule_policy(self):
self.enable_priority_scheduling,
self.schedule_low_priority_values_first,
)
self.schedule_enhancer = None
if SCHEDULER_DECREASE_PREFILL_IDLE:
self.schedule_enhancer = SchedulerEnhancer(
self.dp_size,
self.attn_tp_size,
self.tp_worker,
self.max_running_requests,
self.server_args,
self.prefill_delayer: Optional[PrefillDelayer] = None
if envs.SGLANG_SCHEDULER_DECREASE_PREFILL_IDLE.get():
self.prefill_delayer = PrefillDelayer(
dp_size=self.dp_size,
attn_tp_size=self.attn_tp_size,
tp_worker=self.tp_worker,
server_args=self.server_args,
)
# Enable preemption for priority scheduling.
self.try_preemption = self.enable_priority_scheduling
Expand Down Expand Up @@ -1870,12 +1868,6 @@ def get_num_allocatable_reqs(self, running_bs):
return res

def get_new_batch_prefill(self) -> Optional[ScheduleBatch]:
if self.schedule_enhancer and not self.schedule_enhancer.get_schedule_decision(
self.running_batch
):
# Decrease prefill idle as much as possible during high dp load.
return None

# Check if the grammar is ready in the grammar queue
if self.grammar_queue:
self.move_ready_grammar_requests()
Expand All @@ -1884,10 +1876,20 @@ def get_new_batch_prefill(self) -> Optional[ScheduleBatch]:
# Reset batch_is_full to try preemption with a prefill adder.
self.running_batch.batch_is_full = False

# Handle the cases where prefill is not allowed
if (
self.running_batch.batch_is_full or len(self.waiting_queue) == 0
) and self.chunked_req is None:
# The `should_allow_prefill` needs to be called on all ranks since contains communication
delayer_allow_prefill = (
self.prefill_delayer.should_allow_prefill(
# TODO: consider offline generation cases when there are a lot of waiting requests
local_can_prefill=len(self.waiting_queue) > 0,
local_is_idle=len(self.running_batch.reqs) == 0,
)
if self.prefill_delayer
else True
)
if (not delayer_allow_prefill) or (
(self.running_batch.batch_is_full or len(self.waiting_queue) == 0)
and self.chunked_req is None
):
return None

running_bs = len(self.running_batch.reqs)
Expand Down
59 changes: 0 additions & 59 deletions python/sglang/srt/managers/scheduler_enhancer.py

This file was deleted.

1 change: 1 addition & 0 deletions test/srt/run_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
TestFile("test_profile_v2.py"),
TestFile("models/test_ministral3_models.py"),
TestFile("test_mistral_large3_basic.py"),
TestFile("test_prefill_delayer.py"),
],
}

Expand Down
69 changes: 69 additions & 0 deletions test/srt/test_prefill_delayer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import unittest

from sglang.bench_serving import run_benchmark
from sglang.srt.environ import envs
from sglang.srt.utils import kill_process_tree
from sglang.test.test_utils import (
DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
DEFAULT_URL_FOR_TEST,
CustomTestCase,
get_benchmark_args,
popen_launch_server,
)


class TestPrefillDelayerThroughput(CustomTestCase):
def _run_throughput_test(self, with_prefill_delayer: bool):
model = "Qwen/Qwen3-0.6B"
base_url = DEFAULT_URL_FOR_TEST
other_args = [
"--trust-remote-code",
"--tp",
"8",
"--enable-dp-attention",
"--dp",
"8",
]

# TODO further fix mem leak
with envs.SGLANG_SCHEDULER_DECREASE_PREFILL_IDLE.override(
with_prefill_delayer
), envs.SGLANG_PREFILL_DELAYER_MAX_DELAY_PASSES.override(
100
), envs.SGLANG_ENABLE_STRICT_MEM_CHECK_DURING_IDLE.override(
False
):
process = popen_launch_server(
model,
base_url,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=other_args,
)

try:
args = get_benchmark_args(
base_url=base_url,
dataset_name="random",
num_prompts=80,
random_input_len=4096,
random_output_len=128,
request_rate=8,
tokenizer=model,
)
res = run_benchmark(args)
finally:
kill_process_tree(process.pid)

print(f"=== {with_prefill_delayer=} ===")
print(f"Input throughput: {res['input_throughput']:.2f} token/s")
print(f"Output throughput: {res['output_throughput']:.2f} token/s")

def test_1_dp_attention_throughput_with_prefill_delayer(self):
self._run_throughput_test(with_prefill_delayer=True)

def test_2_dp_attention_throughput_without_prefill_delayer(self):
self._run_throughput_test(with_prefill_delayer=False)


if __name__ == "__main__":
unittest.main()
Loading