Skip to content

Conversation

krishung5
Copy link
Contributor

@krishung5 krishung5 commented Oct 13, 2025

Overview:

Created a DynamoEventManager to emit kv events from KVBM. Enables for only vllm for now.

Details:

Where should the reviewer start?

Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)

  • closes GitHub issue: #xxx

Summary by CodeRabbit

  • New Features
    • Optional KV cache event publishing to an external indexer, auto-configured at runtime and gracefully disabled if unavailable.
  • Refactor
    • Unified event handling under a single event manager with clearer operational logging.
  • Documentation
    • Clarified configuration for enabling or disabling event logging/publishing.
  • Tests
    • Added end-to-end tests validating store/remove KV event emission and payload integrity.

@krishung5 krishung5 marked this pull request as ready for review October 13, 2025 19:31
@krishung5 krishung5 requested a review from a team as a code owner October 13, 2025 19:31
@krishung5 krishung5 changed the title feat: Emit KV events from KVBM feat: Emit KV events from vllm KVBM Oct 13, 2025
return;
}

// todo: get namespace and component name from the component/config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO for this PR or future follow up?

@rmccorm4 rmccorm4 requested a review from PeaBrane October 13, 2025 19:36
Copy link
Contributor

coderabbitai bot commented Oct 13, 2025

Walkthrough

Adds optional KV event publishing to the block manager: introduces KvEventPublisher wiring from Python bindings and vLLM leader into runtime config; implements DynamoEventManager to publish store/remove events; updates resource initialization to select EventManager from explicit config, runtime publisher, or Null; removes legacy NATS-based dynamo module; adds tests.

Changes

Cohort / File(s) Summary
Python bindings: BlockManagerBuilder and vLLM leader connector
lib/bindings/python/rust/llm/block_manager.rs, lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs
BlockManagerBuilder gains optional kv_event_publisher field and setter; builder propagates publisher into KvManagerRuntimeConfig. vLLM leader resolves namespace/component and worker_id, attempts to create KvEventPublisher, logs outcomes, and passes it to BlockManagerBuilder.
Runtime config and event pipeline
lib/llm/src/block_manager/config.rs, lib/llm/src/block_manager/events.rs, lib/llm/src/block_manager/state/resources.rs
Adds kv_event_publisher: Option<Arc<KvEventPublisher>> to runtime config. Introduces DynamoEventManager with publishing logic (store/remove) and test constructor. Resources initialization now selects EventManager: explicit config > runtime publisher (DynamoEventManager) > NullEventManager.
Pool and state imports/args
lib/llm/src/block_manager/pool.rs, lib/llm/src/block_manager/pool/managed.rs, lib/llm/src/block_manager/state.rs
Removes NullEventManager import where unused. ManagedBlockPoolArgs.event_manager becomes Option<Arc<dyn EventManager>> with strip_option; build defaults to NullEventManager when none provided; tests updated accordingly.
Block registry visibility tweak
lib/llm/src/block_manager/block/registry.rs
Makes RegistrationHandle::from_token_block pub(crate) to allow internal usage.
Remove legacy NATS-based dynamo
lib/llm/src/block_manager/events/dynamo.rs
Deletes entire module: DynamoPublisher, NatsEventManager, state, and async progress engine.
Publisher debug improvements
lib/llm/src/kv_router/publisher.rs
Adds Debug for KvEventSource; derives Debug for KvEventPublisher.
Tests: KV event emissions
lib/llm/tests/block_manager.rs
Adds tests verifying store/remove KV cache events via DynamoEventManager::new_test, covering single and multi-block scenarios.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor App
  participant Leader as vLLM Leader
  participant Bld as BlockManagerBuilder
  participant Cfg as KvManagerRuntimeConfig
  participant Res as Resources
  participant EM as EventManager
  participant Pub as KvEventPublisher

  App->>Leader: initialize()
  Leader->>Leader: read env (namespace, component)
  Leader->>Leader: get worker_id from DRt lease
  Leader->>Pub: try create KvEventPublisher
  alt publisher created
    Leader->>Bld: kv_event_publisher(Some(Pub))
  else failure
    Leader->>Bld: kv_event_publisher(None)
  end
  Bld->>Cfg: build (attach optional publisher)
  Cfg->>Res: pass runtime config
  alt explicit event_manager in config
    Res->>EM: use provided EventManager
  else runtime has publisher
    Res->>EM: create DynamoEventManager(Pub)
  else
    Res->>EM: use NullEventManager
  end
Loading
sequenceDiagram
  autonumber
  participant Client
  participant BM as BlockManager / Registry
  participant EM as DynamoEventManager
  participant Pub as KvEventPublisher
  Note over Client,BM: Store flow
  Client->>BM: register_blocks([...])
  BM-->>Client: RegistrationHandle(s)
  Client-->>BM: drop RegistrationHandle(s)
  BM->>EM: publish(handles)
  EM->>EM: build KvCacheEvent(Stored)
  EM->>Pub: publish(event)
  Note over Client,BM: Remove flow
  Client-->>BM: drop BlockState
  BM->>EM: block_release(handle)
  EM->>EM: build KvCacheEvent(Removed)
  EM->>Pub: publish(event)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

A whisk of wires, a hop through logs,
I thump out events, no ifs or fogs.
Old NATS burrow closed, new tunnels gleam,
KV crumbs trail neatly in a stream.
Debug carrots shine, hashes align—
Hippity-hop, the cache is fine! 🥕✨

Pre-merge checks

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The pull request description correctly uses the template headings but leaves the Details and Where should the reviewer start sections empty and does not identify any specific files for review. The Related Issues section also contains only a placeholder reference rather than a real issue number. Without concrete change details or review pointers, the description fails to meet the repository’s template requirements. Please populate the Details section with a summary of the implemented changes, list the key files or modules for reviewer focus in the Where should the reviewer start section, and replace the placeholder issue reference with a valid issue number or link.
Docstring Coverage ⚠️ Warning Docstring coverage is 54.55% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title “feat: Emit KV events from vllm KVBM” succinctly captures the main feature added by this changeset, namely enabling KV event emission in the vllm block manager. It is concise, specific, and directly reflects the primary functionality introduced without extraneous detail. A reviewer scanning the commit history will understand the focus on adding event publishing to the vllm key‐value block manager.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (7)
lib/llm/src/block_manager/state.rs (1)

249-254: Fix mislabeled pool name ("disk" vs "device")

Label is unused but confusing in logs. Prefer accuracy.

Apply:

-                    create_block_pool::<_, _, Metadata>(factory, &resources, "disk")?;
+                    create_block_pool::<_, _, Metadata>(factory, &resources, "device")?;
lib/llm/src/block_manager/block/registry.rs (1)

302-303: Typo in test comment

"we shoudl" → "we should".

-        // we shoudl get two events when this is dropped, since we never took ownership of the RegistrationHandle
+        // we should get two events when this is dropped, since we never took ownership of the RegistrationHandle
lib/bindings/python/rust/llm/block_manager.rs (1)

295-301: Builder setter API ergonomics

Setter taking Option<Arc<...>> works, but most call sites will already have an Option. Consider also offering an overload that takes Arc and wraps it, to avoid Some(...) at call sites when the value is present.

 impl BlockManagerBuilder {
+    pub fn kv_event_publisher_arc(
+        mut self,
+        publisher: Arc<dynamo_llm::kv_router::publisher::KvEventPublisher>,
+    ) -> Self {
+        self.kv_event_publisher = Some(publisher);
+        self
+    }
lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs (1)

127-135: Consider sourcing namespace/component from config over env vars

Env fallbacks are fine, but if the runtime has canonical config for namespace/component, prefer that and allow env to override. Reduces drift between services.

lib/llm/src/block_manager/pool/managed.rs (1)

88-102: Fallback to NullEventManager is fine; align logs with upstream selection

This locally falls back to NullEventManager. Since state/resources.rs may already inject a real DynamoEventManager, consider a trace here noting whether an explicit event manager was provided to avoid confusing logs.

lib/llm/src/block_manager/events.rs (2)

214-253: Publish(Store) builds event correctly; minor nits

  • OK to derive parent_hash from first handle.
  • Minor: store_data.clone() is unused after constructing event; cloning is unnecessary.
-        let event = KvCacheEvent {
-            event_id: self.next_event_id(),
-            data: KvCacheEventData::Stored(store_data.clone()),
-        };
+        let event = KvCacheEvent {
+            event_id: self.next_event_id(),
+            data: KvCacheEventData::Stored(store_data),
+        };

255-277: Publish(Remove) path OK; atomic ordering can be relaxed

  • Event shape and error logging are fine.
  • next_event_id uses SeqCst; Relaxed suffices for uniqueness and reduces contention.
-        self.event_id_counter
-            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+        self.event_id_counter
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8dd104d and 5ffb689.

📒 Files selected for processing (12)
  • lib/bindings/python/rust/llm/block_manager.rs (3 hunks)
  • lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs (1 hunks)
  • lib/llm/src/block_manager/block/registry.rs (1 hunks)
  • lib/llm/src/block_manager/config.rs (1 hunks)
  • lib/llm/src/block_manager/events.rs (1 hunks)
  • lib/llm/src/block_manager/events/dynamo.rs (0 hunks)
  • lib/llm/src/block_manager/pool.rs (1 hunks)
  • lib/llm/src/block_manager/pool/managed.rs (3 hunks)
  • lib/llm/src/block_manager/state.rs (1 hunks)
  • lib/llm/src/block_manager/state/resources.rs (1 hunks)
  • lib/llm/src/kv_router/publisher.rs (2 hunks)
  • lib/llm/tests/block_manager.rs (1 hunks)
💤 Files with no reviewable changes (1)
  • lib/llm/src/block_manager/events/dynamo.rs
🧰 Additional context used
🧬 Code graph analysis (8)
lib/llm/src/block_manager/config.rs (3)
lib/llm/src/block_manager/events.rs (1)
  • publisher (303-305)
lib/llm/src/block_manager/pool/managed/state.rs (1)
  • publisher (345-347)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (773-806)
lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs (3)
lib/bindings/python/rust/llm/block_manager.rs (1)
  • kv_event_publisher (295-301)
lib/bindings/python/rust/lib.rs (1)
  • component (706-712)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (773-806)
lib/llm/src/block_manager/state/resources.rs (2)
lib/bindings/python/rust/llm/block_manager.rs (3)
  • kv_event_publisher (295-301)
  • new (89-202)
  • new (264-269)
lib/llm/src/block_manager/events.rs (5)
  • new (53-57)
  • new (90-95)
  • new (129-131)
  • new (177-182)
  • new (295-301)
lib/llm/src/block_manager/pool/managed.rs (3)
lib/llm/src/block_manager/state/resources.rs (1)
  • new (8-86)
lib/llm/src/block_manager/block/registry.rs (2)
  • new (56-64)
  • new (81-123)
lib/llm/src/block_manager/events.rs (5)
  • new (53-57)
  • new (90-95)
  • new (129-131)
  • new (177-182)
  • new (295-301)
lib/bindings/python/rust/llm/block_manager.rs (2)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (773-806)
lib/llm/src/block_manager/config.rs (4)
  • builder (42-44)
  • builder (84-86)
  • builder (131-133)
  • builder (213-215)
lib/llm/tests/block_manager.rs (2)
lib/llm/src/block_manager/block/registry.rs (5)
  • create_sequence (270-288)
  • new (56-64)
  • new (81-123)
  • drop (68-70)
  • drop (258-260)
lib/llm/src/block_manager/events.rs (4)
  • new_test (187-198)
  • drop (69-73)
  • drop (114-116)
  • publisher (303-305)
lib/llm/src/block_manager/events.rs (3)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (773-806)
lib/llm/src/kv_router/publisher.rs (11)
  • mpsc (125-125)
  • mpsc (1133-1133)
  • mpsc (1157-1157)
  • publish (169-171)
  • publish (772-785)
  • publish (1088-1099)
  • new (117-167)
  • new (714-747)
  • new (763-770)
  • new (920-922)
  • new (1075-1083)
lib/llm/src/block_manager/state/resources.rs (1)
  • new (8-86)
lib/llm/src/kv_router/publisher.rs (2)
lib/runtime/src/transports/nats.rs (1)
  • fmt (365-374)
lib/llm/src/block_manager/block.rs (3)
  • fmt (629-642)
  • fmt (761-774)
  • fmt (1004-1013)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: trtllm (arm64)
  • GitHub Check: trtllm (amd64)
  • GitHub Check: vllm (arm64)
  • GitHub Check: vllm (amd64)
  • GitHub Check: sglang
  • GitHub Check: Build and Test - dynamo
  • GitHub Check: tests (launch/dynamo-run)
  • GitHub Check: tests (lib/bindings/python)
  • GitHub Check: tests (.)
  • GitHub Check: tests (lib/runtime/examples)
  • GitHub Check: Mirror Repository to GitLab
🔇 Additional comments (11)
lib/llm/src/block_manager/state.rs (1)

17-17: Import narrowing to EventManager looks right

Consistent with centralizing Null/Dynamo selection elsewhere. No concerns.

lib/llm/src/block_manager/block/registry.rs (1)

233-244: Scope reduction to crate-private is appropriate

Keeps API surface tight while enabling internal use. Good change.

lib/llm/src/block_manager/pool.rs (1)

17-17: Import cleanup LGTM

Aligns with event manager selection centralized in resources.

lib/llm/src/kv_router/publisher.rs (1)

54-63: Debug implementations are safe and helpful

  • KvEventSource::Debug redacts JoinHandle internals.
  • KvEventPublisher derives Debug for better diagnostics.

Also applies to: 103-103

lib/llm/src/block_manager/state/resources.rs (1)

21-33: EventManager resolution order is sound

Prefers explicit manager, then Dynamo via kv_event_publisher, then Null. Clear logs. Good defaulting.

lib/llm/tests/block_manager.rs (1)

876-1056: Solid end-to-end tests for Store/Remove paths

Exercises single and multi-block publish and release via new_test. Timeouts are reasonable; assertions precise.

lib/bindings/python/rust/llm/block_manager.rs (2)

260-261: Field added: OK, fits builder defaults

Optional Arc on the builder is reasonable and defaults to None via derive(Default). No issues.


316-327: kv_event_publisher builder signature verified
KvManagerRuntimeConfigBuilder::kv_event_publisher accepts Option<Arc<dynamo_llm::kv_router::publisher::KvEventPublisher>>, matching the passed publisher.

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs (1)

127-175: Robust optional publisher wiring with clear logs — LGTM

  • Reads namespace/component from env with sane fallbacks.
  • Uses lease id for publisher worker_id; errors degrade gracefully.

Double-check KvEventPublisher::new signature (component, worker_id, page_size, Option<_>) matches the actual API (see script in the other comment).

lib/llm/src/block_manager/pool/managed.rs (1)

599-613: Test helper mirrors production fallback — OK

Using NullEventManager in tests when none provided keeps tests isolated. Looks good.

lib/llm/src/block_manager/events.rs (1)

144-199: DynamoEventManager skeleton looks solid

  • Encapsulates publisher behind enum; provides real and test constructors.
  • next_event_id monotonic; Debug impl fine.

Comment on lines +37 to 39
#[builder(default)]
pub kv_event_publisher: Option<Arc<crate::kv_router::publisher::KvEventPublisher>>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Add builder ergonomics for kv_event_publisher

Field addition looks good. For ease-of-use, allow passing Arc directly.

-    #[builder(default)]
-    pub kv_event_publisher: Option<Arc<crate::kv_router::publisher::KvEventPublisher>>,
+    /// Optional KV event publisher enabling real KV cache event emission.
+    #[builder(default, setter(strip_option))]
+    pub kv_event_publisher: Option<Arc<crate::kv_router::publisher::KvEventPublisher>>,

Run to confirm builder usage paths are updated (Python/leader wiring etc.):


🏁 Script executed:

#!/bin/bash
rg -n -C3 'kv_event_publisher\s*\('

Length of output: 3308


Add strip_option setter for kv_event_publisher and update callers

  • In config.rs, change to #[builder(default, setter(strip_option))] on kv_event_publisher.
  • In lib/bindings/python/rust/llm/block_manager.rs and block_manager/vllm/connector/leader.rs, remove Some(…) and pass the raw Arc<KvEventPublisher> to .kv_event_publisher(publisher) (handle Option beforehand).

Comment on lines +60 to 67
/// Event manager for publishing block registration and removal events.
///
/// **Defaults to `DynamoEventManager`** which logs KV cache events (STORE/REMOVE).
///
/// To disable event logging, explicitly provide `NullEventManager::new()`.
#[builder(default = "None", setter(strip_option))]
event_manager: Option<Arc<dyn EventManager>>,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docstring contradicts behavior (defaults to NullEventManager, not Dynamo)

Comments say default is DynamoEventManager, but code defaults to None here and is resolved to NullEventManager later in build(). Upstream selection of Dynamo happens in state/resources.rs when a kv_event_publisher exists. Please fix the doc to reflect this.

-    /// **Defaults to `DynamoEventManager`** which logs KV cache events (STORE/REMOVE).
-    ///
-    /// To disable event logging, explicitly provide `NullEventManager::new()`.
+    /// If not provided here, the BlockManager will choose:
+    /// - `DynamoEventManager` when a `kv_event_publisher` is present in the runtime config, or
+    /// - `NullEventManager` (no event publishing) otherwise.
+    /// To force-disable logging, explicitly provide `NullEventManager::new()`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Event manager for publishing block registration and removal events.
///
/// **Defaults to `DynamoEventManager`** which logs KV cache events (STORE/REMOVE).
///
/// To disable event logging, explicitly provide `NullEventManager::new()`.
#[builder(default = "None", setter(strip_option))]
event_manager: Option<Arc<dyn EventManager>>,
/// Event manager for publishing block registration and removal events.
///
/// If not provided here, the BlockManager will choose:
/// - `DynamoEventManager` when a `kv_event_publisher` is present in the runtime config, or
/// - `NullEventManager` (no event publishing) otherwise.
/// To force-disable logging, explicitly provide `NullEventManager::new()`.
#[builder(default = "None", setter(strip_option))]
event_manager: Option<Arc<dyn EventManager>>,
🤖 Prompt for AI Agents
In lib/llm/src/block_manager/pool/managed.rs around lines 60 to 67, the
docstring wrongly states the default is DynamoEventManager; update it to state
that the builder field defaults to None (which results in a NullEventManager by
default) and that DynamoEventManager is selected upstream (in
state/resources.rs) when a kv_event_publisher exists; keep the note about
disabling event logging by explicitly providing NullEventManager::new() and
adjust wording to match the actual builder/default behavior.

Copy link
Contributor

coderabbitai bot commented Oct 13, 2025

Walkthrough

Adds optional KV event publishing across the block manager stack. Introduces DynamoEventManager and integrates an optional KvEventPublisher into runtime config, builder, and vLLM leader. Selects EventManager at runtime based on provided manager or publisher. Replaces/removes legacy NATS-based event manager. Adds Debug for publisher types and new end-to-end tests.

Changes

Cohort / File(s) Summary
Builder integration for KV publisher
lib/bindings/python/rust/llm/block_manager.rs
BlockManagerBuilder gains optional kv_event_publisher field and setter; build path conditionally injects publisher into KvManagerRuntimeConfig.
vLLM leader publisher wiring
lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs
Leader attempts to resolve and create KvEventPublisher using env/lease; logs failures; passes Option publisher to BlockManagerBuilder; non-fatal if unavailable.
Runtime config surface
lib/llm/src/block_manager/config.rs
KvManagerRuntimeConfig adds kv_event_publisher: Option<Arc> with builder default.
Event manager implementation (new)
lib/llm/src/block_manager/events.rs
Adds DynamoEventManager with PublisherImpl (real/mock), implements EventManager, EventPublisher, EventReleaseManager; emits Stored/Removed KvCacheEvents; test constructor provided.
Legacy NATS event manager removal
lib/llm/src/block_manager/events/dynamo.rs
Removes DynamoPublisher, NatsEventManager, and related routing/progress engine logic.
Pool args optional event manager
lib/llm/src/block_manager/pool/managed.rs
ManagedBlockPoolArgs.event_manager becomes Option; build chooses provided, or DynamoEventManager if runtime publisher set, else NullEventManager.
Resource selection of EventManager
lib/llm/src/block_manager/state/resources.rs
Selects EventManager: provided -> DynamoEventManager (if kv_event_publisher present) -> NullEventManager; logs selection.
Imports adjusted
lib/llm/src/block_manager/pool.rs, lib/llm/src/block_manager/state.rs
Update imports to reflect new/default EventManager handling; remove direct NullEventManager import where unused.
Registration handle visibility
lib/llm/src/block_manager/block/registry.rs
from_token_block visibility widened to pub(crate).
Publisher Debug implementations
lib/llm/src/kv_router/publisher.rs
Derive Debug for KvEventPublisher; custom Debug for KvEventSource.
Tests for KV events
lib/llm/tests/block_manager.rs
Adds tests verifying DynamoEventManager emits Store/Remove events (single and batch), with mock path via channel.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Leader as KvConnectorLeader
  participant Resolver as ComponentResolver
  participant Pub as KvEventPublisher (optional)
  participant Bldr as BlockManagerBuilder
  participant RT as KvManagerRuntimeConfig
  participant Pool as ManagedBlockPool

  Leader->>Resolver: resolve(namespace, component)
  alt Resolve OK
    Resolver-->>Leader: Component
    Leader->>Leader: derive worker_id, page_size
    Leader->>Pub: create publisher (optional)
    Note right of Leader: On error, skip publisher
  else Resolve Fail
    Leader->>Leader: publisher = None
  end
  Leader->>Bldr: kv_event_publisher(publisher)
  Bldr->>RT: build() with optional kv_event_publisher
  RT-->>Pool: runtime config
  Pool->>Pool: select EventManager<br/>- Provided<br/>- else DynamoEventManager if RT.publisher<br/>- else NullEventManager
  Pool-->>Leader: BlockManager ready
Loading
sequenceDiagram
  autonumber
  participant Client
  participant BM as BlockManager
  participant EM as DynamoEventManager
  participant Pub as KvEventPublisher
  participant Indexer as KV Indexer

  Client->>BM: register blocks
  BM->>EM: publish(handles)
  EM->>Pub: publish Stored(KvCacheStoreData)
  Pub-->>Indexer: event
  Client--x BM: drop RegistrationHandle
  BM->>EM: block_release(handle)
  EM->>Pub: publish Removed(KvCacheRemoveData)
  Pub-->>Indexer: event
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

A whisk of wires, a hop of logs,
I bound through blocks and catalogues.
Events now leap with gentle flair,
If pubs are there—or plain old air.
I twitch my ears at Debug’s glow,
Then thump: “It’s shipped!” Now watch it go. 🐇✨

Pre-merge checks

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The description follows the required template but leaves the Details and Where should the reviewer start? sections empty and retains placeholder text without specifying actual changes or file pointers, and the Related Issues entry still uses a placeholder. Please populate the Details section with a concise summary of the changes made, list the key files for reviewer focus under Where should the reviewer start?, and replace the placeholder issue number in Related Issues with the actual issue reference.
Docstring Coverage ⚠️ Warning Docstring coverage is 54.55% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title “feat: Emit KV events from vllm KVBM” clearly and concisely highlights the main change of adding KV event emission support to the vllm block manager, matching the pull request’s primary objective without unnecessary detail or noise.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
lib/llm/src/kv_router/publisher.rs (1)

456-475: Bounds check to prevent slice panic on malformed events

Slicing token_ids without verifying length can panic if payloads are inconsistent. Add a guard before slicing.

-        let tokens = &token_ids[token_offset..(token_offset + *num_tokens_it as usize)];
+        let end = token_offset + (*num_tokens_it as usize);
+        if end > token_ids.len() {
+            if warning_count.fetch_add(1, Ordering::Relaxed) < 3 {
+                tracing::warn!(
+                    expected=%*num_tokens_it,
+                    available=%(token_ids.len().saturating_sub(token_offset)),
+                    "Insufficient token_ids for block; dropping remaining blocks"
+                );
+            }
+            break;
+        }
+        let tokens = &token_ids[token_offset..end];
🧹 Nitpick comments (7)
lib/llm/src/kv_router/publisher.rs (3)

125-126: Unbounded mpsc can OOM under backpressure

ZMQ/source → processor uses unbounded channel. If publisher (NATS) stalls, memory can grow unbounded. Prefer bounded mpsc with a backpressure strategy (drop oldest, coalesce batches, or spawn bounded worker pool).

Example change at creation site:

- let (tx, rx) = mpsc::unbounded_channel::<KvCacheEvent>();
+ let (tx, rx) = mpsc::channel::<KvCacheEvent>(1024);

And update publish signature to map SendError to TrySendError as needed.

Also applies to: 199-201


153-159: Connect failure is silent to callers; consider surfacing readiness/errors

If NATS connect fails, the task returns, rx is dropped, and future publish() calls error with SendError—hard to trace. Consider:

  • Await connect before returning from new(), or
  • Provide a readiness oneshot/flag, or
  • Add retry/backoff on connect.

Minimal improvement:

  • Emit a warn/error metric and store an internal “failed” state to log once per publish() error.

499-506: Signed-to-u64 cast may admit negative values

Casting negative i64 to u64 will wrap. If negatives are unexpected, reject or log-and-skip to avoid surprising hashes.

lib/llm/src/block_manager/config.rs (1)

37-39: New kv_event_publisher field: OK

Optional Arc is fine and backward-compatible. Consider documenting lifetimes/ownership and whether multiple managers may share one publisher. If coupling is a concern, you could hide the concrete type behind a trait in the future.

lib/llm/src/block_manager/state/resources.rs (1)

21-32: Event manager selection logic looks solid

Explicit > Dynamo (when publisher present) > Null fallback is clear. Consider downgrading these to debug to avoid noisy info logs during startup.

lib/llm/src/block_manager/pool/managed.rs (1)

96-102: Explicit NullEventManager fallback is correct; consider logging choice

Optional: add a debug/info log stating which manager is used to aid troubleshooting.

-        let event_manager = if let Some(event_manager) = event_manager_opt {
+        let event_manager = if let Some(event_manager) = event_manager_opt {
             event_manager
         } else {
-            crate::block_manager::events::NullEventManager::new()
+            tracing::info!("ManagedBlockPool: using NullEventManager (no event publishing)");
+            crate::block_manager::events::NullEventManager::new()
         };
lib/llm/src/block_manager/events.rs (1)

200-204: SeqCst is unnecessary; Relaxed is sufficient for a monotonic counter

No cross-thread ordering needed beyond uniqueness.

-        self.event_id_counter
-            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+        self.event_id_counter
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8dd104d and 5ffb689.

📒 Files selected for processing (12)
  • lib/bindings/python/rust/llm/block_manager.rs (3 hunks)
  • lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs (1 hunks)
  • lib/llm/src/block_manager/block/registry.rs (1 hunks)
  • lib/llm/src/block_manager/config.rs (1 hunks)
  • lib/llm/src/block_manager/events.rs (1 hunks)
  • lib/llm/src/block_manager/events/dynamo.rs (0 hunks)
  • lib/llm/src/block_manager/pool.rs (1 hunks)
  • lib/llm/src/block_manager/pool/managed.rs (3 hunks)
  • lib/llm/src/block_manager/state.rs (1 hunks)
  • lib/llm/src/block_manager/state/resources.rs (1 hunks)
  • lib/llm/src/kv_router/publisher.rs (2 hunks)
  • lib/llm/tests/block_manager.rs (1 hunks)
💤 Files with no reviewable changes (1)
  • lib/llm/src/block_manager/events/dynamo.rs
🧰 Additional context used
🧬 Code graph analysis (8)
lib/llm/src/block_manager/config.rs (3)
lib/llm/src/block_manager/events.rs (1)
  • publisher (303-305)
lib/llm/src/block_manager/pool/managed/state.rs (1)
  • publisher (345-347)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (773-806)
lib/llm/src/block_manager/state/resources.rs (2)
lib/bindings/python/rust/llm/block_manager.rs (3)
  • kv_event_publisher (295-301)
  • new (89-202)
  • new (264-269)
lib/llm/src/block_manager/events.rs (5)
  • new (53-57)
  • new (90-95)
  • new (129-131)
  • new (177-182)
  • new (295-301)
lib/bindings/python/rust/llm/block_manager.rs (2)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (773-806)
lib/llm/src/block_manager/config.rs (4)
  • builder (42-44)
  • builder (84-86)
  • builder (131-133)
  • builder (213-215)
lib/llm/src/block_manager/pool/managed.rs (2)
lib/llm/src/block_manager/state/resources.rs (1)
  • new (8-86)
lib/llm/src/block_manager/events.rs (5)
  • new (53-57)
  • new (90-95)
  • new (129-131)
  • new (177-182)
  • new (295-301)
lib/llm/src/kv_router/publisher.rs (2)
lib/runtime/src/transports/nats.rs (1)
  • fmt (365-374)
lib/llm/src/block_manager/block.rs (3)
  • fmt (629-642)
  • fmt (761-774)
  • fmt (1004-1013)
lib/llm/tests/block_manager.rs (3)
lib/runtime/src/worker.rs (2)
  • tokio (147-147)
  • from_current (198-205)
lib/llm/src/block_manager/block/registry.rs (5)
  • create_sequence (270-288)
  • new (56-64)
  • new (81-123)
  • drop (68-70)
  • drop (258-260)
lib/llm/src/block_manager/events.rs (4)
  • new_test (187-198)
  • drop (69-73)
  • drop (114-116)
  • publisher (303-305)
lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs (3)
lib/bindings/python/rust/llm/block_manager.rs (1)
  • kv_event_publisher (295-301)
lib/bindings/python/rust/lib.rs (1)
  • component (706-712)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • KvEventPublisher (773-806)
lib/llm/src/block_manager/events.rs (3)
lib/llm/src/kv_router/publisher.rs (11)
  • mpsc (125-125)
  • mpsc (1133-1133)
  • mpsc (1157-1157)
  • publish (169-171)
  • publish (772-785)
  • publish (1088-1099)
  • new (117-167)
  • new (714-747)
  • new (763-770)
  • new (920-922)
  • new (1075-1083)
lib/llm/src/kv_router/indexer.rs (8)
  • mpsc (797-797)
  • mpsc (798-798)
  • mpsc (799-799)
  • mpsc (800-800)
  • mpsc (1048-1048)
  • mpsc (1050-1050)
  • mpsc (1051-1051)
  • new (193-195)
lib/llm/src/block_manager/state/resources.rs (1)
  • new (8-86)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: trtllm (arm64)
  • GitHub Check: trtllm (amd64)
  • GitHub Check: vllm (arm64)
  • GitHub Check: vllm (amd64)
  • GitHub Check: sglang
  • GitHub Check: Build and Test - dynamo
  • GitHub Check: clippy (launch/dynamo-run)
  • GitHub Check: clippy (.)
  • GitHub Check: tests (launch/dynamo-run)
  • GitHub Check: tests (lib/bindings/python)
  • GitHub Check: clippy (lib/bindings/python)
  • GitHub Check: tests (.)
  • GitHub Check: tests (lib/runtime/examples)
  • GitHub Check: Mirror Repository to GitLab
🔇 Additional comments (17)
lib/llm/src/kv_router/publisher.rs (2)

54-63: Good: custom Debug with redaction

Custom Debug avoids leaking JoinHandle internals. Matches existing style in transports.


103-114: Deriving Debug for KvEventPublisher is fine

All fields implement Debug; no sensitive data printed.

lib/llm/src/block_manager/block/registry.rs (1)

233-245: Visibility tweak looks right

pub(crate) narrows exposure while enabling crate-internal constructors. No functional change.

lib/llm/src/block_manager/pool.rs (1)

17-17: Import update is consistent with new event plumbing

Switching to EventManager keeps surface generic and aligns with refactor.

lib/llm/src/block_manager/state.rs (1)

17-17: Import aligns with new event manager selection

No behavioral change here.

lib/llm/tests/block_manager.rs (1)

877-1056: Tests cover key event emission paths

Good coverage for store/remove and multi-block batching via test-mode DynamoEventManager.

lib/llm/src/block_manager/pool/managed.rs (3)

88-94: Rename aligns with Option; LGTM


599-606: Args dissolve update is consistent with optional event_manager


607-613: Test builder fallback mirrors production path; LGTM

lib/bindings/python/rust/llm/block_manager.rs (3)

260-261: New field for KvEventPublisher wiring looks good

Public surface addition is reasonable and scoped.


295-301: Setter API is fine; accepts Option for explicit None

No issues.


316-327: Approve runtime config publisher propagation
Verified KvManagerRuntimeConfig has a kv_event_publisher field and the builder exposes its setter (via derive macro); conditional injection correctly preserves behavior when None.

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs (1)

127-176: Robust optional publisher creation; good error handling

Environment defaults + lease-derived worker_id and graceful fallback are sensible.

lib/llm/src/block_manager/events.rs (4)

144-150: Introduction of DynamoEventManager is clear

Struct design is straightforward and object-safe.


175-183: Constructor with real publisher: LGTM


186-199: Test constructor is useful for E2E verification

Good testability hook.


255-277: Remove event emission: LGTM

Single-block RAII removal is appropriate.

Comment on lines +214 to +253
impl EventPublisher for DynamoEventManager {
fn publish(&self, handles: Vec<Arc<RegistrationHandle>>) {
use crate::kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash,
};

if handles.is_empty() {
return;
}

let parent_hash = handles
.first()
.and_then(|h| h.parent_sequence_hash())
.map(|hash| ExternalSequenceBlockHash(hash));

let blocks: Vec<KvCacheStoredBlockData> = handles
.iter()
.map(|handle| KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(handle.sequence_hash()),
tokens_hash: LocalBlockHash(handle.block_hash()),
})
.collect();

let store_data = KvCacheStoreData {
parent_hash,
blocks,
};

let event = KvCacheEvent {
event_id: self.next_event_id(),
data: KvCacheEventData::Stored(store_data.clone()),
};

// Publish to the indexer
if let Err(e) = self.publisher.publish(event) {
tracing::error!("Failed to publish STORED event to indexer: {}", e);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Batch publish assumes a single parent; split by parent to avoid malformed events

If handles span different parent hashes, emitting one STORED event is ambiguous. Group by parent_hash and emit one event per group.

Apply this refactor sketch:

-        let parent_hash = handles
-            .first()
-            .and_then(|h| h.parent_sequence_hash())
-            .map(|hash| ExternalSequenceBlockHash(hash));
-
-        let blocks: Vec<KvCacheStoredBlockData> = handles
-            .iter()
-            .map(|handle| KvCacheStoredBlockData {
-                block_hash: ExternalSequenceBlockHash(handle.sequence_hash()),
-                tokens_hash: LocalBlockHash(handle.block_hash()),
-            })
-            .collect();
-
-        let store_data = KvCacheStoreData { parent_hash, blocks };
-        let event = KvCacheEvent { event_id: self.next_event_id(), data: KvCacheEventData::Stored(store_data.clone()) };
-        if let Err(e) = self.publisher.publish(event) { ... }
+        use std::collections::BTreeMap;
+        let mut groups: BTreeMap<Option<ExternalSequenceBlockHash>, Vec<KvCacheStoredBlockData>> =
+            BTreeMap::new();
+        for handle in handles {
+            let key = handle.parent_sequence_hash().map(ExternalSequenceBlockHash);
+            groups.entry(key).or_default().push(KvCacheStoredBlockData {
+                block_hash: ExternalSequenceBlockHash(handle.sequence_hash()),
+                tokens_hash: LocalBlockHash(handle.block_hash()),
+            });
+        }
+        for (parent_hash, blocks) in groups {
+            let store_data = KvCacheStoreData { parent_hash, blocks };
+            let event = KvCacheEvent {
+                event_id: self.next_event_id(),
+                data: KvCacheEventData::Stored(store_data),
+            };
+            if let Err(e) = self.publisher.publish(event) {
+                tracing::error!("Failed to publish STORED event to indexer: {}", e);
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
impl EventPublisher for DynamoEventManager {
fn publish(&self, handles: Vec<Arc<RegistrationHandle>>) {
use crate::kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash,
};
if handles.is_empty() {
return;
}
let parent_hash = handles
.first()
.and_then(|h| h.parent_sequence_hash())
.map(|hash| ExternalSequenceBlockHash(hash));
let blocks: Vec<KvCacheStoredBlockData> = handles
.iter()
.map(|handle| KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(handle.sequence_hash()),
tokens_hash: LocalBlockHash(handle.block_hash()),
})
.collect();
let store_data = KvCacheStoreData {
parent_hash,
blocks,
};
let event = KvCacheEvent {
event_id: self.next_event_id(),
data: KvCacheEventData::Stored(store_data.clone()),
};
// Publish to the indexer
if let Err(e) = self.publisher.publish(event) {
tracing::error!("Failed to publish STORED event to indexer: {}", e);
}
}
}
impl EventPublisher for DynamoEventManager {
fn publish(&self, handles: Vec<Arc<RegistrationHandle>>) {
use crate::kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash,
};
use std::collections::BTreeMap;
if handles.is_empty() {
return;
}
let mut groups: BTreeMap<Option<ExternalSequenceBlockHash>, Vec<KvCacheStoredBlockData>> =
BTreeMap::new();
for handle in handles {
let key = handle.parent_sequence_hash().map(ExternalSequenceBlockHash);
groups.entry(key).or_default().push(KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(handle.sequence_hash()),
tokens_hash: LocalBlockHash(handle.block_hash()),
});
}
for (parent_hash, blocks) in groups {
let store_data = KvCacheStoreData { parent_hash, blocks };
let event = KvCacheEvent {
event_id: self.next_event_id(),
data: KvCacheEventData::Stored(store_data),
};
if let Err(e) = self.publisher.publish(event) {
tracing::error!("Failed to publish STORED event to indexer: {}", e);
}
}
}
}
🤖 Prompt for AI Agents
lib/llm/src/block_manager/events.rs lines 214-253: the current publish
implementation assumes all handles share a single parent and emits one STORED
event for all handles, which can produce malformed/ambiguous events when handles
have different parent_sequence_hashes; change the logic to group the handles by
their parent_sequence_hash (treating None as a separate group), then for each
group build a Vec<KvCacheStoredBlockData> from that group's handles, create a
KvCacheStoreData with the group's parent_hash mapped to
ExternalSequenceBlockHash when Some or None otherwise, construct a KvCacheEvent
per group (with event_id from next_event_id and
data::Stored(store_data.clone())) and call self.publisher.publish for each
event, logging errors individually; keep the early return for empty handles.

@PeaBrane
Copy link
Contributor

@krishung5 thanks! Do you mind having a look at the rabbit comments and address them if they make sense


/// Event manager that emits KV cache events to the indexer.
pub struct DynamoEventManager {
event_id_counter: std::sync::atomic::AtomicU64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. But there are several inline imports like std::sync crate::kv_router::publisher etc, can we move them to top?

@PeaBrane
Copy link
Contributor

I think it generally LGTM, maybe outside the scope here, but if possible, can we run some e2e tests with vllm engines using kvbm, and send some requests to it (with some shared system prompt), and see the router outputs in the terminal make sense (and no warnings)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants