Skip to content

Commit 988be0b

Browse files
feat: Add NPU+GPU async vision pipelining for v1 engine
Layer async NPU vision pre-encoding on top of the FlexMLRT backend: vision scheduler in EngineCore, scheduler deferral when vision is not ready, and gpu_model_runner pre-encoding thread pool. Gated by VLLM_NPU_ASYNC_PIPELINE=1 (default off). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent f59c1e6 commit 988be0b

8 files changed

Lines changed: 1084 additions & 17 deletions

File tree

vllm/envs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@
215215
VLLM_VISION_NPU_BACKEND: str = ""
216216
VLLM_VISION_NPU_CACHE: str | None = None
217217
VLLM_VISION_NPU_DEVICE: str | None = None
218+
VLLM_NPU_ASYNC_PIPELINE: bool = False
218219
VLLM_NPU_TIMING: bool = False
219220
VLLM_MORIIO_QP_PER_TRANSFER: int = 1
220221
VLLM_MORIIO_POST_BATCH_SIZE: int = -1
@@ -1754,6 +1755,8 @@ def _get_or_set_default() -> str:
17541755
"VLLM_VISION_NPU_CACHE": lambda: os.getenv("VLLM_VISION_NPU_CACHE"),
17551756
# NPU device name (e.g., "stx" for Strix, "phx" for Phoenix)
17561757
"VLLM_VISION_NPU_DEVICE": lambda: os.getenv("VLLM_VISION_NPU_DEVICE"),
1758+
# Enable async pipelining of NPU vision encoding with GPU LLM inference
1759+
"VLLM_NPU_ASYNC_PIPELINE": lambda: os.getenv("VLLM_NPU_ASYNC_PIPELINE", "0") == "1",
17571760
# Enable NPU timing debug logs
17581761
"VLLM_NPU_TIMING": lambda: os.getenv("VLLM_NPU_TIMING", "0") == "1",
17591762
# Enable CUDA compatibility mode for datacenter GPUs with older

vllm/model_executor/models/vision.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ def get_npu_vision_backend():
628628
629629
Returns:
630630
NPUVisionBackend instance if NPU backend is enabled, None otherwise.
631+
Returns AsyncFlexMLRTVisionBackend if VLLM_NPU_ASYNC_PIPELINE=1.
631632
632633
Raises:
633634
ValueError: If backend name is recognized but initialization fails.
@@ -647,8 +648,14 @@ def get_npu_vision_backend():
647648
)
648649
device_name = envs.VLLM_VISION_NPU_DEVICE or "stx"
649650

650-
from vllm.vision_npu.flexmlrt_backend import FlexMLRTVisionBackend
651+
# Use async backend if pipelining is enabled
652+
if envs.VLLM_NPU_ASYNC_PIPELINE:
653+
from vllm.vision_npu.flexmlrt_backend import AsyncFlexMLRTVisionBackend
651654

652-
return FlexMLRTVisionBackend(model_cache, device_name)
655+
return AsyncFlexMLRTVisionBackend(model_cache, device_name)
656+
else:
657+
from vllm.vision_npu.flexmlrt_backend import FlexMLRTVisionBackend
658+
659+
return FlexMLRTVisionBackend(model_cache, device_name)
653660

654661
return None

vllm/v1/core/sched/scheduler.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ def __init__(
103103

104104
# Scheduling constraints.
105105
self.max_num_running_reqs = self.scheduler_config.max_num_seqs
106+
self.enable_hybrid_pipeline = (
107+
envs.VLLM_NPU_ASYNC_PIPELINE
108+
and envs.VLLM_VISION_NPU_BACKEND.lower() == "flexmlrt"
109+
)
110+
# Set during schedule() when a request is deferred for NPU vision.
111+
self.waiting_on_vision_encoding = False
106112
self.max_num_scheduled_tokens = (
107113
self.scheduler_config.max_num_scheduled_tokens
108114
if self.scheduler_config.max_num_scheduled_tokens
@@ -357,6 +363,8 @@ def schedule(self) -> SchedulerOutput:
357363
# chunked prefills, prefix caching, speculative decoding,
358364
# and the "jump decoding" optimization in the future.
359365

366+
self.waiting_on_vision_encoding = False
367+
360368
scheduled_new_reqs: list[Request] = []
361369
scheduled_resumed_reqs: list[Request] = []
362370
scheduled_running_reqs: list[Request] = []
@@ -574,6 +582,25 @@ def schedule(self) -> SchedulerOutput:
574582
request = request_queue.peek_request()
575583
request_id = request.request_id
576584

585+
# HYBRID PIPELINING: defer prefill until NPU vision is ready.
586+
if self.enable_hybrid_pipeline and self.max_num_running_reqs == 1:
587+
needs_vision = (
588+
request.num_computed_tokens == 0 and request.mm_features
589+
)
590+
if needs_vision:
591+
from vllm.v1.engine.core import (
592+
_VISION_PREENCODING_CACHE,
593+
is_vision_preencoding_ready,
594+
)
595+
596+
if not is_vision_preencoding_ready(
597+
request_id, _VISION_PREENCODING_CACHE
598+
):
599+
self.waiting_on_vision_encoding = True
600+
request_queue.pop_request()
601+
step_skipped_waiting.prepend_request(request)
602+
continue
603+
577604
# try to promote blocked statuses while traversing skipped queue.
578605
if self._is_blocked_waiting_status(
579606
request.status
@@ -814,6 +841,19 @@ def schedule(self) -> SchedulerOutput:
814841
continue
815842

816843
self.running.append(request)
844+
845+
if self.enable_hybrid_pipeline and self.max_num_running_reqs == 1:
846+
is_vision_phase = (
847+
request.num_computed_tokens == 0 and request.mm_features
848+
)
849+
phase_name = "VISION" if is_vision_phase else "LLM"
850+
logger.debug(
851+
"[Hybrid Scheduler] Scheduled %s in %s phase (running: %d)",
852+
request.request_id,
853+
phase_name,
854+
len(self.running),
855+
)
856+
817857
if self.log_stats:
818858
request.record_event(
819859
EngineCoreEventType.SCHEDULED, scheduled_timestamp

vllm/v1/engine/core.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,34 @@
8585

8686
_R = TypeVar("_R") # Return type for collective_rpc
8787

88+
# Global vision pre-encoding cache (shared between EngineCore and workers)
89+
_VISION_PREENCODING_CACHE: dict[str, Any] = {}
90+
91+
# Busy-loop backoff while deferred requests wait on NPU vision (seconds).
92+
_VISION_POLL_SLEEP_S = 0.02
93+
_DEFAULT_BUSY_LOOP_SLEEP_S = 0.001
94+
95+
96+
def is_vision_preencoding_ready(
97+
request_id: str, cache: dict[str, Any] | None = None
98+
) -> bool:
99+
"""Return True when background vision encoding finished for a request."""
100+
if cache is None:
101+
cache = _VISION_PREENCODING_CACHE
102+
cached = cache.get(request_id)
103+
if cached == "COMPLETED":
104+
return True
105+
if cached is None:
106+
return False
107+
done = getattr(cached, "done", None)
108+
return callable(done) and done()
109+
110+
111+
def _request_has_vision_mm(request: Any) -> bool:
112+
if not request.mm_features:
113+
return False
114+
return any(f.modality in ("image", "video") for f in request.mm_features)
115+
88116

89117
class EngineCore:
90118
"""Inner loop of vLLM's Engine."""
@@ -398,6 +426,59 @@ def log_iteration_details(self, scheduler_output: SchedulerOutput):
398426
)
399427
self._iteration_index += 1
400428

429+
def _schedule_waiting_vision(self) -> None:
430+
"""Vision Scheduler: Proactively trigger pre-encoding for waiting requests.
431+
432+
This is the key to enabling pipelining with max-num-seqs=1:
433+
- Core scheduler only schedules 1 LLM at a time (max-num-seqs=1)
434+
- Vision scheduler processes ALL waiting requests' vision independently
435+
- Request 2's vision can process while Request 1's LLM runs
436+
"""
437+
if not (
438+
envs.VLLM_NPU_ASYNC_PIPELINE
439+
and envs.VLLM_VISION_NPU_BACKEND.lower() == "flexmlrt"
440+
):
441+
return
442+
443+
try:
444+
waiting_requests = list(self.scheduler.waiting) # type: ignore[attr-defined]
445+
except Exception as e:
446+
logger.exception("[Vision Scheduler] Error accessing waiting queue: %s", e)
447+
return
448+
449+
if not waiting_requests:
450+
return
451+
452+
# Skip the scan when every waiting vision request is already submitted.
453+
pending_submit = False
454+
for request in waiting_requests:
455+
if not _request_has_vision_mm(request):
456+
continue
457+
req_id = request.request_id
458+
if req_id not in _VISION_PREENCODING_CACHE:
459+
pending_submit = True
460+
break
461+
462+
if not pending_submit:
463+
return
464+
465+
for request in waiting_requests:
466+
if not _request_has_vision_mm(request):
467+
continue
468+
469+
req_id = request.request_id
470+
if req_id in _VISION_PREENCODING_CACHE:
471+
continue
472+
473+
has_mm_hash = any(mm_feature.mm_hash for mm_feature in request.mm_features)
474+
if not has_mm_hash:
475+
continue
476+
477+
logger.debug(
478+
"[Vision Scheduler] Submitting pre-encoding for request %s", req_id
479+
)
480+
self.model_executor.submit_vision_encoding(req_id, request.mm_features)
481+
401482
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
402483
"""Schedule, execute, and make output.
403484
@@ -409,6 +490,10 @@ def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
409490
# or finished and not yet removed from the batch.
410491
if not self.scheduler.has_requests():
411492
return {}, False
493+
494+
# Vision pre-encoding for waiting requests (overlaps with running LLM).
495+
self._schedule_waiting_vision()
496+
412497
scheduler_output = self.scheduler.schedule()
413498
future = self.model_executor.execute_model(scheduler_output, non_block=True)
414499
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
@@ -467,7 +552,12 @@ def step_with_batch_queue(
467552
model_executed = False
468553
deferred_scheduler_output = None
469554
if self.scheduler.has_requests():
555+
# VISION SCHEDULER: Proactively trigger pre-encoding
556+
# Request 2's vision can start while Request 1's LLM runs
557+
self._schedule_waiting_vision()
558+
470559
scheduler_output = self.scheduler.schedule()
560+
471561
with self.log_error_detail(scheduler_output):
472562
exec_future = self.model_executor.execute_model(
473563
scheduler_output, non_block=True
@@ -1211,7 +1301,10 @@ def _process_engine_step(self) -> bool:
12111301
# background threads (like NIXL handshake) to make progress.
12121302
# Without this, the tight polling loop can starve background threads.
12131303
if not model_executed and self.scheduler.has_unfinished_requests():
1214-
time.sleep(0.001)
1304+
if getattr(self.scheduler, "waiting_on_vision_encoding", False):
1305+
time.sleep(_VISION_POLL_SLEEP_S)
1306+
else:
1307+
time.sleep(_DEFAULT_BUSY_LOOP_SLEEP_S)
12151308

12161309
return model_executed
12171310

vllm/v1/engine/output_processor.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import numpy as np
1111
import torch
1212

13+
from vllm.logger import init_logger
1314
from vllm.lora.request import LoRARequest
1415
from vllm.outputs import (
1516
STREAM_FINISHED,
@@ -38,6 +39,8 @@
3839
SchedulerStats,
3940
)
4041

42+
logger = init_logger(__name__)
43+
4144
# shared empty CPU tensor used as a placeholder pooling output
4245
EMPTY_CPU_TENSOR = torch.empty(0, device="cpu")
4346

@@ -678,6 +681,33 @@ def process_outputs(
678681
self._update_stats_from_finished(
679682
req_state, finish_reason, iteration_stats
680683
)
684+
685+
# Debug logging for request timing
686+
if req_state.stats and iteration_stats:
687+
metrics = req_state.stats
688+
e2e_time = (
689+
iteration_stats.iteration_timestamp - metrics.arrival_time
690+
)
691+
queued_time = metrics.scheduled_ts - metrics.queued_ts
692+
prefill_time = metrics.first_token_ts - metrics.scheduled_ts
693+
decode_time = metrics.last_token_ts - metrics.first_token_ts
694+
num_tokens = metrics.num_generation_tokens
695+
tokens_per_sec = (
696+
num_tokens / decode_time if decode_time > 0 else 0
697+
)
698+
logger.debug(
699+
"Request %s: E2E=%.3fs, Queue=%.3fs, "
700+
"Prefill=%.3fs, Decode=%.3fs, "
701+
"Tokens=%d (%.1f tok/s)",
702+
req_state.request_id,
703+
e2e_time,
704+
queued_time,
705+
prefill_time,
706+
decode_time,
707+
num_tokens,
708+
tokens_per_sec,
709+
)
710+
681711
if self.tracing_enabled:
682712
self.do_tracing(engine_core_output, req_state, iteration_stats)
683713

vllm/v1/executor/uniproc_executor.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,17 @@ def check_health(self) -> None:
132132
# it's running.
133133
return
134134

135+
def submit_vision_encoding(self, req_id, mm_features):
136+
"""Submit vision encoding for a waiting request to enable pipelining.
137+
138+
This is called by the Vision Scheduler to proactively start vision processing
139+
for requests that are waiting in the queue (not yet scheduled for LLM).
140+
"""
141+
# Direct call to model_runner for UniProcExecutor (no RPC needed)
142+
if hasattr(self.driver_worker, "model_runner"):
143+
self.driver_worker.model_runner.submit_vision_encoding(req_id, mm_features)
144+
return None
145+
135146
def shutdown(self) -> None:
136147
if worker := self.driver_worker:
137148
worker.shutdown()

0 commit comments

Comments
 (0)