Skip to content

Commit 597be42

Browse files
fix log and scheduling
Signed-off-by: lichang <liangliang.chang@amd.com>
1 parent 8e15457 commit 597be42

4 files changed

Lines changed: 108 additions & 105 deletions

File tree

vllm/v1/core/sched/scheduler.py

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ def __init__(
103103

104104
# Scheduling constraints.
105105
self.max_num_running_reqs = self.scheduler_config.max_num_seqs
106+
self.enable_hybrid_pipeline = (
107+
envs.VLLM_NPU_ASYNC_PIPELINE
108+
and envs.VLLM_VISION_NPU_BACKEND.lower() == "flexmlrt"
109+
)
110+
# Set during schedule() when a request is deferred for NPU vision.
111+
self.waiting_on_vision_encoding = False
106112
self.max_num_scheduled_tokens = (
107113
self.scheduler_config.max_num_scheduled_tokens
108114
if self.scheduler_config.max_num_scheduled_tokens
@@ -357,11 +363,7 @@ def schedule(self) -> SchedulerOutput:
357363
# chunked prefills, prefix caching, speculative decoding,
358364
# and the "jump decoding" optimization in the future.
359365

360-
# Check if hybrid NPU+GPU pipelining is enabled
361-
enable_hybrid_pipeline = (
362-
envs.VLLM_NPU_ASYNC_PIPELINE
363-
and envs.VLLM_VISION_NPU_BACKEND.lower() == "flexmlrt"
364-
)
366+
self.waiting_on_vision_encoding = False
365367

366368
scheduled_new_reqs: list[Request] = []
367369
scheduled_resumed_reqs: list[Request] = []
@@ -580,37 +582,21 @@ def schedule(self) -> SchedulerOutput:
580582
request = request_queue.peek_request()
581583
request_id = request.request_id
582584

583-
# HYBRID PIPELINING: Check if vision is ready BEFORE processing
584-
if enable_hybrid_pipeline and self.max_num_running_reqs == 1:
585+
# HYBRID PIPELINING: defer prefill until NPU vision is ready.
586+
if self.enable_hybrid_pipeline and self.max_num_running_reqs == 1:
585587
needs_vision = (
586588
request.num_computed_tokens == 0 and request.mm_features
587589
)
588590
if needs_vision:
589-
# Check if vision encoding is complete
590-
from vllm.v1.engine.core import _VISION_PREENCODING_CACHE
591-
592-
vision_ready = False
593-
if request_id in _VISION_PREENCODING_CACHE:
594-
cached = _VISION_PREENCODING_CACHE[request_id]
595-
if (
596-
cached == "COMPLETED"
597-
or hasattr(cached, "done")
598-
and cached.done()
599-
):
600-
vision_ready = True
601-
602-
immediate_key = f"immediate_{request_id}"
603-
if (
604-
not vision_ready
605-
and immediate_key in _VISION_PREENCODING_CACHE
606-
):
607-
cached = _VISION_PREENCODING_CACHE[immediate_key]
608-
if hasattr(cached, "done") and cached.done():
609-
vision_ready = True
591+
from vllm.v1.engine.core import (
592+
_VISION_PREENCODING_CACHE,
593+
is_vision_preencoding_ready,
594+
)
610595

611-
if not vision_ready:
612-
# Vision not ready - skip this request
613-
# Vision Scheduler will proactively process waiting requests
596+
if not is_vision_preencoding_ready(
597+
request_id, _VISION_PREENCODING_CACHE
598+
):
599+
self.waiting_on_vision_encoding = True
614600
request_queue.pop_request()
615601
step_skipped_waiting.prepend_request(request)
616602
continue
@@ -856,13 +842,12 @@ def schedule(self) -> SchedulerOutput:
856842

857843
self.running.append(request)
858844

859-
# Log hybrid scheduler decisions
860-
if enable_hybrid_pipeline and self.max_num_running_reqs == 1:
845+
if self.enable_hybrid_pipeline and self.max_num_running_reqs == 1:
861846
is_vision_phase = (
862847
request.num_computed_tokens == 0 and request.mm_features
863848
)
864849
phase_name = "VISION" if is_vision_phase else "LLM"
865-
logger.info(
850+
logger.debug(
866851
"[Hybrid Scheduler] Scheduled %s in %s phase (running: %d)",
867852
request.request_id,
868853
phase_name,

vllm/v1/engine/core.py

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,31 @@
8888
# Global vision pre-encoding cache (shared between EngineCore and workers)
8989
_VISION_PREENCODING_CACHE: dict[str, Any] = {}
9090

91+
# Busy-loop backoff while deferred requests wait on NPU vision (seconds).
92+
_VISION_POLL_SLEEP_S = 0.02
93+
_DEFAULT_BUSY_LOOP_SLEEP_S = 0.001
94+
95+
96+
def is_vision_preencoding_ready(
97+
request_id: str, cache: dict[str, Any] | None = None
98+
) -> bool:
99+
"""Return True when background vision encoding finished for a request."""
100+
if cache is None:
101+
cache = _VISION_PREENCODING_CACHE
102+
cached = cache.get(request_id)
103+
if cached == "COMPLETED":
104+
return True
105+
if cached is None:
106+
return False
107+
done = getattr(cached, "done", None)
108+
return callable(done) and done()
109+
110+
111+
def _request_has_vision_mm(request: Any) -> bool:
112+
if not request.mm_features:
113+
return False
114+
return any(f.modality in ("image", "video") for f in request.mm_features)
115+
91116

92117
class EngineCore:
93118
"""Inner loop of vLLM's Engine."""
@@ -409,57 +434,47 @@ def _schedule_waiting_vision(self) -> None:
409434
- Vision scheduler processes ALL waiting requests' vision independently
410435
- Request 2's vision can process while Request 1's LLM runs
411436
"""
412-
# Only enable for NPU async pipelining
413-
if not envs.VLLM_NPU_ASYNC_PIPELINE:
437+
if not (
438+
envs.VLLM_NPU_ASYNC_PIPELINE
439+
and envs.VLLM_VISION_NPU_BACKEND.lower() == "flexmlrt"
440+
):
414441
return
415442

416-
# Import the pre-encoding cache
417-
from vllm.v1.engine.core import _VISION_PREENCODING_CACHE
418-
419-
# Get waiting requests from scheduler
420443
try:
421444
waiting_requests = list(self.scheduler.waiting) # type: ignore[attr-defined]
422-
# Only log when there are actually waiting requests to process
423-
if len(waiting_requests) > 0:
424-
logger.info(
425-
"[Vision Scheduler] Found %d waiting requests",
426-
len(waiting_requests),
427-
)
428445
except Exception as e:
429446
logger.exception("[Vision Scheduler] Error accessing waiting queue: %s", e)
430447
return
431448

432-
for idx, request in enumerate(waiting_requests):
433-
# Only process requests with multimodal features
434-
if not request.mm_features:
449+
if not waiting_requests:
450+
return
451+
452+
# Skip the scan when every waiting vision request is already submitted.
453+
pending_submit = False
454+
for request in waiting_requests:
455+
if not _request_has_vision_mm(request):
435456
continue
457+
req_id = request.request_id
458+
if req_id not in _VISION_PREENCODING_CACHE:
459+
pending_submit = True
460+
break
436461

437-
# Check if this is a vision request
438-
has_vision = any(
439-
f.modality in ("image", "video") for f in request.mm_features
440-
)
441-
if not has_vision:
462+
if not pending_submit:
463+
return
464+
465+
for request in waiting_requests:
466+
if not _request_has_vision_mm(request):
442467
continue
443468

444469
req_id = request.request_id
445-
446-
# Check if already in cache (in progress or completed)
447470
if req_id in _VISION_PREENCODING_CACHE:
448471
continue
449472

450-
# Compute mm_hash for this request
451-
mm_hash = None
452-
for mm_feature in request.mm_features:
453-
if mm_feature.mm_hash:
454-
mm_hash = mm_feature.mm_hash
455-
break
456-
457-
if mm_hash is None:
473+
has_mm_hash = any(mm_feature.mm_hash for mm_feature in request.mm_features)
474+
if not has_mm_hash:
458475
continue
459476

460-
# Submit vision encoding for this waiting request
461-
# Pass only serializable data (req_id and mm_features) for RPC
462-
logger.info(
477+
logger.debug(
463478
"[Vision Scheduler] Submitting pre-encoding for request %s", req_id
464479
)
465480
self.model_executor.submit_vision_encoding(req_id, request.mm_features)
@@ -476,11 +491,8 @@ def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
476491
if not self.scheduler.has_requests():
477492
return {}, False
478493

479-
# VISION SCHEDULER: Proactively trigger pre-encoding for waiting requests
480-
# Request 2's vision can start while Request 1's LLM runs
481-
logger.info("[EngineCore] About to call _schedule_waiting_vision()")
494+
# Vision pre-encoding for waiting requests (overlaps with running LLM).
482495
self._schedule_waiting_vision()
483-
logger.info("[EngineCore] Returned from _schedule_waiting_vision()")
484496

485497
scheduler_output = self.scheduler.schedule()
486498
future = self.model_executor.execute_model(scheduler_output, non_block=True)
@@ -1289,7 +1301,10 @@ def _process_engine_step(self) -> bool:
12891301
# background threads (like NIXL handshake) to make progress.
12901302
# Without this, the tight polling loop can starve background threads.
12911303
if not model_executed and self.scheduler.has_unfinished_requests():
1292-
time.sleep(0.001)
1304+
if getattr(self.scheduler, "waiting_on_vision_encoding", False):
1305+
time.sleep(_VISION_POLL_SLEEP_S)
1306+
else:
1307+
time.sleep(_DEFAULT_BUSY_LOOP_SLEEP_S)
12931308

12941309
return model_executed
12951310

0 commit comments

Comments
 (0)