perf(multimodal): reduce video decode, Qwen preprocess, and TokenSpeed handoff overhead#1820
perf(multimodal): reduce video decode, Qwen preprocess, and TokenSpeed handoff overhead#1820yechank-nvidia wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request optimizes video decoding with OpenCV by sequentially grabbing intervening frames instead of seeking directly to each sampled frame. A review comment identified a critical bug where decoded_pos is not updated if capture.read() fails, which would cause subsequent frame decoding to go out of sync. The reviewer provided a code suggestion to correctly update the decoder position and handle grab failures explicitly.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
📝 WalkthroughWalkthroughMultimodal media decoding now uses async blocking helpers, path-aware video decoding, and duration-aware FFmpeg/OpenCV selection. Qwen vision preprocessing shifts to borrowed image refs and raw RGB patchification. The gateway, TokenSpeed servicer, and SHM writer update their preprocessing, serialization, and validation paths, with matching tests and docs. ChangesMultimodal Pipeline and TokenSpeed Transport
Sequence Diagram(s)sequenceDiagram
participant process_multimodal_parts
participant assemble_tokenspeed
participant write_tokenspeed_shm_with
participant CountingWriter
process_multimodal_parts->>assemble_tokenspeed: pass preprocessed encoder inputs
assemble_tokenspeed->>write_tokenspeed_shm_with: serialize TokenSpeed SHM payload
write_tokenspeed_shm_with->>CountingWriter: write packed bytes
CountingWriter-->>write_tokenspeed_shm_with: bytes_written
write_tokenspeed_shm_with-->>assemble_tokenspeed: return ShmHandle
Estimated code review effort🎯 5 (Critical) | ⏱️ ~90+ minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
6215045 to
e99ae9e
Compare
…rame seek decode_video_with_opencv_file called capture.set(CAP_PROP_POS_FRAMES, idx) for every sampled frame. OpenCV flushes/re-seeks the decoder on each POS_FRAMES set (~10 ms/frame even for adjacent frames), so a 20-frame 2 fps clip spent ~195 ms in decode -- the largest single component of multimodal video TTFT. Advance to each sampled frame by sequentially grabbing the intervening frames (grab() decodes without retrieving, ~1-2 ms/frame) and reading only the sampled ones. This avoids a decoder flush/seek for every sampled frame. Measured (Qwen3.5-4B, 20-frame 512x512 clip): decode ~197 ms -> ~69 ms (2.8x), end-to-end video request ~357 ms -> ~228 ms (~36%). Verified bit-exact against the previous per-frame seek: identical decoded pixels on both dense and sparse (non-keyframe) sampling, and identical model output on a numbered-frame video, so accuracy is unchanged. Signed-off-by: yechank-nvidia <161688079+yechank-nvidia@users.noreply.github.com>
Move more media decode work off the async runtime and add borrowed/parallel Qwen image and video preprocessing paths. Signed-off-by: yechank-nvidia <161688079+yechank-nvidia@users.noreply.github.com>
Pack TokenSpeed encoder inputs into offset SHM segments, preserve placeholder spans for faster worker handoff, and default video tensor transport to auto. Signed-off-by: yechank-nvidia <161688079+yechank-nvidia@users.noreply.github.com>
e99ae9e to
60d6d7b
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 60d6d7b68f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| "fs", | ||
| "rt-multi-thread", | ||
| "process", | ||
| "time", |
There was a problem hiding this comment.
Enable Tokio io-util for pipe reads
This crate now imports tokio::io::AsyncReadExt and calls read_to_end on the ffmpeg stdout/stderr pipes, but the llm-multimodal Tokio dependency still enables only sync, fs, rt-multi-thread, process, and time. When this crate is checked or consumed without another workspace target enabling Tokio's full/io-util feature, the new extension trait is not available and video decoding builds fail; add io-util to this feature list.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/multimodal/src/media.rs`:
- Around line 323-332: The `read_and_hash` and `decode_video_frames_from_path`
paths in `media.rs` are operating on different snapshots of the same video file,
which can make `VideoClip.raw_bytes` and `VideoClip.hash` disagree with the
decoded frames. Change the `decode` flow so both hashing and frame decoding use
one immutable file snapshot or a stable descriptor/snapshot created once, rather
than reopening `canonical` separately. Keep the fix localized around
`read_and_hash`, `decode_video_frames_from_path`, and the `tokio::try_join!`
call so the returned `VideoClip` always reflects a single consistent file
version.
- Around line 1039-1043: The missing-binary branch in the `command.spawn()`
error handling still hardcodes a `video_url inputs` message, but
`MediaConnectorError::VideoDecode` is now used by `File`, `DataUrl`, and
`InlineBytes` paths too. Update the error text in this `map_err` block to use a
source-agnostic decode message tied to the `program` being spawned, so missing
`ffmpeg`/`ffprobe` guidance applies to all decode inputs.
In `@crates/multimodal/src/vision/processors/qwen_vl_base.rs`:
- Around line 1361-1398: The new test only covers the serial resize/preprocess
path and does not verify the image-parallel branch. Add a parity test around
QwenVLProcessorBase::patchify_image_rgb_block_band that exercises the same
resize/normalize inputs and compares its output against the existing serial
patchify/preprocess path, so the parallel image fast path gets the same
regression coverage as the video path.
In `@crates/multimodal/src/vision/transforms.rs`:
- Around line 446-456: The row-threshold check in par_threads can overflow when
computing 2 * cfg.min_rows_per_thread from
par_config/SMG_MM_PREPROCESS_PAR_MIN_ROWS. Update the early-return condition to
use saturating arithmetic for that comparison so it cannot panic in debug/tests
or wrap in release, while keeping the existing behavior of returning 1 for small
workloads.
In `@grpc_servicer/smg_grpc_servicer/tokenspeed/servicer.py`:
- Around line 1192-1193: The cast fallback in
TokenSpeedSchedulerServicer._feature_from_proto can trigger _tensor_from_proto
to unlink a shared packed SHM segment too early when multiple packed inputs
share the same backing file. Update the fallback path so it does not destroy
shared SHM before all packed items are read—either prevent packing when cast_to
is required on the servicer side, or change _tensor_payload_bytes_from_shm and
related SHM handling to defer unlinking until every offset/reference in the
shared segment has been materialized.
In `@model_gateway/src/routers/grpc/multimodal.rs`:
- Around line 1161-1186: Reject placeholder/item count mismatches before
constructing TokenSpeed items in the multimodal path: in the loop that builds
`pending_items`, stop relying on
`mm_placeholders_by_item.next().unwrap_or_default()` and instead validate that
`placeholders_for_items(&intermediate.placeholders, patch_offsets)` yields
exactly `item_count` groups before iteration. If the counts differ, return an
error early from the surrounding multimodal serialization flow so
`encoder_input_for_item` and `serialize_model_specific_for_item` are not used to
build partially invalid `PendingTokenSpeedItem` values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f1d34efa-d749-452c-9e07-649de652678d
📒 Files selected for processing (12)
crates/multimodal/Cargo.tomlcrates/multimodal/src/media.rscrates/multimodal/src/vision/processor.rscrates/multimodal/src/vision/processors/qwen2_vl.rscrates/multimodal/src/vision/processors/qwen3_vl.rscrates/multimodal/src/vision/processors/qwen_vl_base.rscrates/multimodal/src/vision/transforms.rsdocs/reference/configuration.mdgrpc_servicer/smg_grpc_servicer/tokenspeed/servicer.pygrpc_servicer/tests/test_tokenspeed_multimodal_shm.pymodel_gateway/src/routers/grpc/multimodal.rsmodel_gateway/src/routers/grpc/proto_wrapper.rs
| let read_and_hash = async { | ||
| let bytes = Bytes::from(fs::read(&canonical).await?); | ||
| let bytes_for_hash = bytes.clone(); | ||
| let hash = task::spawn_blocking(move || crate::hasher::hash_video(&bytes_for_hash)) | ||
| .await | ||
| .map_err(MediaConnectorError::Blocking)?; | ||
| Ok::<_, MediaConnectorError>((bytes, hash)) | ||
| }; | ||
| let decode = decode_video_frames_from_path(&canonical, input_bytes, None, cfg); | ||
| let ((bytes, hash), decoded) = tokio::try_join!(read_and_hash, decode)?; |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Decode and hash the same file snapshot.
Lines 323-332 read/hash canonical in one task while decoding the path in another. If that file is modified between those operations, the returned VideoClip can contain frames from one version and raw_bytes/hash from another. Use one immutable snapshot for both operations, or decode through a stable descriptor/snapshot instead of reopening the path twice.
Based on crates/multimodal/src/types.rs, VideoClip stores decoded frames alongside the original raw_bytes and hash, so those fields need to describe the same content.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/multimodal/src/media.rs` around lines 323 - 332, The `read_and_hash`
and `decode_video_frames_from_path` paths in `media.rs` are operating on
different snapshots of the same video file, which can make `VideoClip.raw_bytes`
and `VideoClip.hash` disagree with the decoded frames. Change the `decode` flow
so both hashing and frame decoding use one immutable file snapshot or a stable
descriptor/snapshot created once, rather than reopening `canonical` separately.
Keep the fix localized around `read_and_hash`, `decode_video_frames_from_path`,
and the `tokio::try_join!` call so the returned `VideoClip` always reflects a
single consistent file version.
| command.spawn().map_err(|e| { | ||
| if e.kind() == std::io::ErrorKind::NotFound { | ||
| MediaConnectorError::VideoDecode(format!( | ||
| "{program} executable not found; install {program} to decode video_url inputs" | ||
| )) |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win
Generalize the missing-binary error message.
Line 1042 still says video_url inputs, but this helper now backs File, DataUrl, and InlineBytes decode paths too. When ffmpeg/ffprobe is missing, that error points operators at the wrong source type.
Suggested fix
- "{program} executable not found; install {program} to decode video_url inputs"
+ "{program} executable not found; install {program} to decode video inputs"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| command.spawn().map_err(|e| { | |
| if e.kind() == std::io::ErrorKind::NotFound { | |
| MediaConnectorError::VideoDecode(format!( | |
| "{program} executable not found; install {program} to decode video_url inputs" | |
| )) | |
| command.spawn().map_err(|e| { | |
| if e.kind() == std::io::ErrorKind::NotFound { | |
| MediaConnectorError::VideoDecode(format!( | |
| "{program} executable not found; install {program} to decode video inputs" | |
| )) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/multimodal/src/media.rs` around lines 1039 - 1043, The missing-binary
branch in the `command.spawn()` error handling still hardcodes a `video_url
inputs` message, but `MediaConnectorError::VideoDecode` is now used by `File`,
`DataUrl`, and `InlineBytes` paths too. Update the error text in this `map_err`
block to use a source-agnostic decode message tied to the `program` being
spawned, so missing `ffmpeg`/`ffprobe` guidance applies to all decode inputs.
| #[test] | ||
| fn test_preprocess_image_matches_tensor_patchify_with_resize() { | ||
| let processor = QwenVLProcessorBase::new(create_video_test_config()); | ||
| let config = PreProcessorConfig { | ||
| image_mean: Some(processor.default_mean().to_vec()), | ||
| image_std: Some(processor.default_std().to_vec()), | ||
| ..Default::default() | ||
| }; | ||
| let image = create_sized_pattern_frame(7, 9, 3); | ||
| let (target_h, target_w) = processor.smart_resize(9, 7).unwrap(); | ||
| assert!( | ||
| (target_w as u32, target_h as u32) != (7u32, 9u32), | ||
| "test must force a resize; target {target_w}x{target_h} should differ from 7x9" | ||
| ); | ||
|
|
||
| let result = processor | ||
| .preprocess(std::slice::from_ref(&image), &config) | ||
| .unwrap(); | ||
| let actual = result.encoder_input.as_slice_memory_order().unwrap(); | ||
|
|
||
| let resized = resize_bicubic_pil(&image, target_w as u32, target_h as u32); | ||
| let tensor = | ||
| to_tensor_and_normalize(&resized, &processor.default_mean(), &processor.default_std()); | ||
| let (grid_t, grid_h, grid_w) = processor.calculate_grid_thw(target_h, target_w, 1); | ||
| let mut expected = Vec::new(); | ||
| processor | ||
| .patchify_into(&tensor, grid_t, grid_h, grid_w, &mut expected) | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(actual.len(), expected.len()); | ||
| for (idx, (&got, &want)) in actual.iter().zip(expected.iter()).enumerate() { | ||
| assert_eq!( | ||
| got.to_bits(), | ||
| want.to_bits(), | ||
| "image patch value differs at index {idx}: got {got}, want {want}" | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win
Add an image-parallel parity test.
This new test validates the resize path, but it stays on the serial branch. The video fast path already has a parallel-block regression guard; patchify_image_rgb_block_band should get the same coverage.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/multimodal/src/vision/processors/qwen_vl_base.rs` around lines 1361 -
1398, The new test only covers the serial resize/preprocess path and does not
verify the image-parallel branch. Add a parity test around
QwenVLProcessorBase::patchify_image_rgb_block_band that exercises the same
resize/normalize inputs and compares its output against the existing serial
patchify/preprocess path, so the parallel image fast path gets the same
regression coverage as the video path.
| pub(crate) fn par_threads(out_bytes: usize, out_rows: usize) -> usize { | ||
| const PAR_MIN_BYTES: usize = 1 << 19; // ~512 KiB output; below this, serial | ||
| const MIN_ROWS_PER_THREAD: usize = 32; // keep enough work per thread | ||
| const MAX_THREADS: usize = 32; // spawning hundreds of threads costs more than it saves | ||
| if out_bytes < PAR_MIN_BYTES || out_rows < 2 * MIN_ROWS_PER_THREAD { | ||
| let cfg = par_config(); | ||
| if out_bytes < cfg.min_bytes || out_rows < 2 * cfg.min_rows_per_thread { | ||
| return 1; | ||
| } | ||
| let avail = std::thread::available_parallelism() | ||
| .map(|n| n.get()) | ||
| .unwrap_or(1); | ||
| (out_rows / MIN_ROWS_PER_THREAD) | ||
| static AVAILABLE_PARALLELISM: OnceLock<usize> = OnceLock::new(); | ||
| let avail = *AVAILABLE_PARALLELISM | ||
| .get_or_init(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); | ||
| (out_rows / cfg.min_rows_per_thread) | ||
| .min(avail) | ||
| .clamp(1, MAX_THREADS) | ||
| .clamp(1, cfg.max_threads) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor
🧩 Analysis chain
🏁 Script executed:
sed -n '1,260p' crates/multimodal/src/vision/transforms.rs && printf '\n--- CUT ---\n' && sed -n '260,520p' crates/multimodal/src/vision/transforms.rsRepository: lightseekorg/smg
Length of output: 18396
🏁 Script executed:
python3 - <<'PY'
from pathlib import Path
p = Path('crates/multimodal/src/vision/transforms.rs')
text = p.read_text()
for needle in ['par_config', 'env_usize', 'par_threads', 'SMG_MM_PREPROCESS_PAR_MIN_ROWS']:
print(f'\n### {needle}')
idx = text.find(needle)
if idx == -1:
print('not found')
continue
start = max(0, text.rfind('\n', 0, idx-400))
end = min(len(text), text.find('\n', idx+1200) if text.find('\n', idx+1200)!=-1 else len(text))
print(text[start:end])
PYRepository: lightseekorg/smg
Length of output: 6854
🏁 Script executed:
git grep -n "pub(crate) fn par_threads\|fn par_config\|env_usize\|SMG_MM_PREPROCESS_PAR_MIN_ROWS" -- crates/multimodal/src/vision/transforms.rsRepository: lightseekorg/smg
Length of output: 838
Use saturating arithmetic for the row threshold. cfg.min_rows_per_thread comes from SMG_MM_PREPROCESS_PAR_MIN_ROWS, so 2 * cfg.min_rows_per_thread can overflow here and panic in debug/tests or wrap in release.
Suggested fix
- if out_bytes < cfg.min_bytes || out_rows < 2 * cfg.min_rows_per_thread {
+ if out_bytes < cfg.min_bytes
+ || out_rows < cfg.min_rows_per_thread.saturating_mul(2)
+ {
return 1;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub(crate) fn par_threads(out_bytes: usize, out_rows: usize) -> usize { | |
| const PAR_MIN_BYTES: usize = 1 << 19; // ~512 KiB output; below this, serial | |
| const MIN_ROWS_PER_THREAD: usize = 32; // keep enough work per thread | |
| const MAX_THREADS: usize = 32; // spawning hundreds of threads costs more than it saves | |
| if out_bytes < PAR_MIN_BYTES || out_rows < 2 * MIN_ROWS_PER_THREAD { | |
| let cfg = par_config(); | |
| if out_bytes < cfg.min_bytes || out_rows < 2 * cfg.min_rows_per_thread { | |
| return 1; | |
| } | |
| let avail = std::thread::available_parallelism() | |
| .map(|n| n.get()) | |
| .unwrap_or(1); | |
| (out_rows / MIN_ROWS_PER_THREAD) | |
| static AVAILABLE_PARALLELISM: OnceLock<usize> = OnceLock::new(); | |
| let avail = *AVAILABLE_PARALLELISM | |
| .get_or_init(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); | |
| (out_rows / cfg.min_rows_per_thread) | |
| .min(avail) | |
| .clamp(1, MAX_THREADS) | |
| .clamp(1, cfg.max_threads) | |
| pub(crate) fn par_threads(out_bytes: usize, out_rows: usize) -> usize { | |
| let cfg = par_config(); | |
| if out_bytes < cfg.min_bytes | |
| || out_rows < cfg.min_rows_per_thread.saturating_mul(2) | |
| { | |
| return 1; | |
| } | |
| static AVAILABLE_PARALLELISM: OnceLock<usize> = OnceLock::new(); | |
| let avail = *AVAILABLE_PARALLELISM | |
| .get_or_init(|| std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)); | |
| (out_rows / cfg.min_rows_per_thread) | |
| .min(avail) | |
| .clamp(1, cfg.max_threads) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/multimodal/src/vision/transforms.rs` around lines 446 - 456, The
row-threshold check in par_threads can overflow when computing 2 *
cfg.min_rows_per_thread from par_config/SMG_MM_PREPROCESS_PAR_MIN_ROWS. Update
the early-return condition to use saturating arithmetic for that comparison so
it cannot panic in debug/tests or wrap in release, while keeping the existing
behavior of returning 1 for small workloads.
| if cast_to is not None and dtype != cast_to: | ||
| return TokenSpeedSchedulerServicer._tensor_from_proto(tensor_data, cast_to=cast_to) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift
Do not unlink shared packed SHM segments during cast fallback.
When packed encoder inputs share one SHM file, a dtype mismatch makes _feature_from_proto materialize via _tensor_from_proto; _tensor_payload_bytes_from_shm can then unlink the shared segment after the first item, causing later offsets in the same segment to fail. Either avoid packing when a servicer-side cast is required, or make the fallback read all shared handles before unlinking/refcount the segment.
Also applies to: 1239-1246
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@grpc_servicer/smg_grpc_servicer/tokenspeed/servicer.py` around lines 1192 -
1193, The cast fallback in TokenSpeedSchedulerServicer._feature_from_proto can
trigger _tensor_from_proto to unlink a shared packed SHM segment too early when
multiple packed inputs share the same backing file. Update the fallback path so
it does not destroy shared SHM before all packed items are read—either prevent
packing when cast_to is required on the servicer side, or change
_tensor_payload_bytes_from_shm and related SHM handling to defer unlinking until
every offset/reference in the shared segment has been materialized.
| let mut mm_placeholders_by_item = | ||
| placeholders_for_items(&intermediate.placeholders, patch_offsets).into_iter(); | ||
| let mut pending_items: Vec<PendingTokenSpeedItem<'_>> = Vec::with_capacity(item_count); | ||
| for item_index in 0..item_count { | ||
| let item_encoder_input = match encoder_input_for_item( | ||
| &intermediate.preprocessed, | ||
| &intermediate.field_layouts, | ||
| &flat_spans, | ||
| item_index, | ||
| ) { | ||
| Ok(value) => value, | ||
| Err(error) => { | ||
| cleanup_tokenspeed_items_encoder_shm(&items, None); | ||
| return Err(error); | ||
| } | ||
| Err(error) => return Err(error), | ||
| }; | ||
| let encoder_input_started = Instant::now(); | ||
| let encoder_input = serialize_array_as_tokenspeed_tensor( | ||
| &item_encoder_input, | ||
| &encoder_input_dtype, | ||
| shm_enabled, | ||
| ); | ||
| let encoder_input_serialize_ms = encoder_input_started.elapsed().as_secs_f64() * 1000.0; | ||
| let model_specific_started = Instant::now(); | ||
| let model_specific_started = log_timing.then(Instant::now); | ||
| let model_specific_tensors = match serialize_model_specific_for_item( | ||
| &intermediate.preprocessed.model_specific, | ||
| &intermediate.field_layouts, | ||
| &flat_spans, | ||
| item_index, | ||
| ) { | ||
| Ok(value) => value, | ||
| Err(error) => { | ||
| // `encoder_input` (possibly SHM) was created for this item but the | ||
| // item isn't built; clean it plus all prior items. | ||
| cleanup_tokenspeed_items_encoder_shm(&items, Some(&encoder_input)); | ||
| return Err(error); | ||
| } | ||
| Err(error) => return Err(error), | ||
| }; | ||
| let model_specific_serialize_ms = model_specific_started.elapsed().as_secs_f64() * 1000.0; | ||
| let mm_placeholders = | ||
| placeholders_for_item(item_index, &intermediate.placeholders, &patch_offsets); | ||
| let content_hash = content_hash_for_item(intermediate.modality, &intermediate, item_index); | ||
| let model_specific_serialize_ms = | ||
| model_specific_started.map(|started| started.elapsed().as_secs_f64() * 1000.0); | ||
| let mm_placeholders = mm_placeholders_by_item.next().unwrap_or_default(); |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Reject placeholder/item count mismatches before building TokenSpeed items.
unwrap_or_default() silently emits an item with empty placeholders when placeholders_for_items(...) returns fewer groups than item_count; the TokenSpeed servicer rejects empty placeholder lists, and extra groups would be dropped. Validate the group count before iterating.
Suggested fix
- let mut mm_placeholders_by_item =
- placeholders_for_items(&intermediate.placeholders, patch_offsets).into_iter();
+ let mm_placeholders_by_item_vec =
+ placeholders_for_items(&intermediate.placeholders, patch_offsets);
+ anyhow::ensure!(
+ mm_placeholders_by_item_vec.len() == item_count,
+ "TokenSpeed placeholder item count mismatch: placeholders={}, items={item_count}",
+ mm_placeholders_by_item_vec.len(),
+ );
+ let mut mm_placeholders_by_item = mm_placeholders_by_item_vec.into_iter();
@@
- let mm_placeholders = mm_placeholders_by_item.next().unwrap_or_default();
+ let mm_placeholders = mm_placeholders_by_item
+ .next()
+ .expect("placeholder item count validated above");📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut mm_placeholders_by_item = | |
| placeholders_for_items(&intermediate.placeholders, patch_offsets).into_iter(); | |
| let mut pending_items: Vec<PendingTokenSpeedItem<'_>> = Vec::with_capacity(item_count); | |
| for item_index in 0..item_count { | |
| let item_encoder_input = match encoder_input_for_item( | |
| &intermediate.preprocessed, | |
| &intermediate.field_layouts, | |
| &flat_spans, | |
| item_index, | |
| ) { | |
| Ok(value) => value, | |
| Err(error) => { | |
| cleanup_tokenspeed_items_encoder_shm(&items, None); | |
| return Err(error); | |
| } | |
| Err(error) => return Err(error), | |
| }; | |
| let encoder_input_started = Instant::now(); | |
| let encoder_input = serialize_array_as_tokenspeed_tensor( | |
| &item_encoder_input, | |
| &encoder_input_dtype, | |
| shm_enabled, | |
| ); | |
| let encoder_input_serialize_ms = encoder_input_started.elapsed().as_secs_f64() * 1000.0; | |
| let model_specific_started = Instant::now(); | |
| let model_specific_started = log_timing.then(Instant::now); | |
| let model_specific_tensors = match serialize_model_specific_for_item( | |
| &intermediate.preprocessed.model_specific, | |
| &intermediate.field_layouts, | |
| &flat_spans, | |
| item_index, | |
| ) { | |
| Ok(value) => value, | |
| Err(error) => { | |
| // `encoder_input` (possibly SHM) was created for this item but the | |
| // item isn't built; clean it plus all prior items. | |
| cleanup_tokenspeed_items_encoder_shm(&items, Some(&encoder_input)); | |
| return Err(error); | |
| } | |
| Err(error) => return Err(error), | |
| }; | |
| let model_specific_serialize_ms = model_specific_started.elapsed().as_secs_f64() * 1000.0; | |
| let mm_placeholders = | |
| placeholders_for_item(item_index, &intermediate.placeholders, &patch_offsets); | |
| let content_hash = content_hash_for_item(intermediate.modality, &intermediate, item_index); | |
| let model_specific_serialize_ms = | |
| model_specific_started.map(|started| started.elapsed().as_secs_f64() * 1000.0); | |
| let mm_placeholders = mm_placeholders_by_item.next().unwrap_or_default(); | |
| let mm_placeholders_by_item_vec = | |
| placeholders_for_items(&intermediate.placeholders, patch_offsets); | |
| anyhow::ensure!( | |
| mm_placeholders_by_item_vec.len() == item_count, | |
| "TokenSpeed placeholder item count mismatch: placeholders={}, items={item_count}", | |
| mm_placeholders_by_item_vec.len(), | |
| ); | |
| let mut mm_placeholders_by_item = mm_placeholders_by_item_vec.into_iter(); | |
| let mut pending_items: Vec<PendingTokenSpeedItem<'_>> = Vec::with_capacity(item_count); | |
| for item_index in 0..item_count { | |
| let item_encoder_input = match encoder_input_for_item( | |
| &intermediate.preprocessed, | |
| &intermediate.field_layouts, | |
| &flat_spans, | |
| item_index, | |
| ) { | |
| Ok(value) => value, | |
| Err(error) => return Err(error), | |
| }; | |
| let model_specific_started = log_timing.then(Instant::now); | |
| let model_specific_tensors = match serialize_model_specific_for_item( | |
| &intermediate.preprocessed.model_specific, | |
| &intermediate.field_layouts, | |
| &flat_spans, | |
| item_index, | |
| ) { | |
| Ok(value) => value, | |
| Err(error) => return Err(error), | |
| }; | |
| let model_specific_serialize_ms = | |
| model_specific_started.map(|started| started.elapsed().as_secs_f64() * 1000.0); | |
| let mm_placeholders = mm_placeholders_by_item | |
| .next() | |
| .expect("placeholder item count validated above"); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@model_gateway/src/routers/grpc/multimodal.rs` around lines 1161 - 1186,
Reject placeholder/item count mismatches before constructing TokenSpeed items in
the multimodal path: in the loop that builds `pending_items`, stop relying on
`mm_placeholders_by_item.next().unwrap_or_default()` and instead validate that
`placeholders_for_items(&intermediate.placeholders, patch_offsets)` yields
exactly `item_count` groups before iteration. If the counts differ, return an
error early from the surrounding multimodal serialization flow so
`encoder_input_for_item` and `serialize_model_specific_for_item` are not used to
build partially invalid `PendingTokenSpeedItem` values.
Description
Problem
Multimodal video requests still spend too much time outside model execution:
decoder repeatedly.
DynamicImageand extra tensor copies before patchification.pass inline.
Solution
grab()-ing intervening frames andread()-ing onlysampled frames.
and parallel work for larger inputs.
fallback.
Changes
Video decode
crates/multimodal/src/media.rs: sequential OpenCV frame sampling, timing logs, safer decoder-position tracking.
Qwen preprocessing
crates/multimodal/src/vision/processors/qwen_vl_base.rs: borrowed RGB video fast path, directpatchify path, parallel preprocessing.
crates/multimodal/src/vision/transforms.rs: raw-RGB bicubic resize helper.TokenSpeed handoff
model_gateway/.../grpc/multimodal.rs: packed per-item encoder tensor serialization and offset SHMtransport.
model_gateway/.../grpc/proto_wrapper.rs: tensor offset metadata.grpc_servicer/.../tokenspeed/servicer.py: offset SHM tensor reads and validation.grpc_servicer/tests/test_tokenspeed_multimodal_shm.py: SHM offset coverage.Test Plan
cargo build --release -j 4 -p smg --bin smgpytest -q grpc_servicer/tests/test_tokenspeed_multimodal_shm.pygit diff --checkChecklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspassesmerge PRs
Summary by CodeRabbit
New Features
Bug Fixes
Performance