feat(acp-nats): add Bridge infrastructure for session cancel and ready#34
feat(acp-nats): add Bridge infrastructure for session cancel and ready#34
Conversation
PR SummaryMedium Risk Overview Extends prompt waiter handling with Tweaks config ergonomics and safety by adding Written by Cursor Bugbot for commit e17abc5. This will update automatically on new commits. Configure here. |
|
Caution Review failedPull request was closed or merged during review WalkthroughAdds TTL-backed cancelled-session tracking and delayed fire-and-forget session.ready publishing with task tracking to the agent Bridge; consolidates NATS client-subject parsing (ClientMethod + ParsedClientSubject) into a single parsing module; adds wildcard subject builders; extends pending-prompt waiter cancellation; and small config API/test tweaks. Changes
Sequence Diagram(s)sequenceDiagram
participant Bridge as Bridge
participant Clock as Clock (GetElapsed)
participant Cancelled as CancelledSessions
participant Spawner as Tokio TaskSpawner
participant Nats as NatsClient
Bridge->>Clock: read SESSION_READY_DELAY
Bridge->>Spawner: spawn delayed task (spawn_session_ready)
Spawner-->>Bridge: JoinHandle returned (registered)
Note over Spawner,Bridge: task sleeps for delay
Spawner->>Clock: check elapsed for session
Spawner->>Cancelled: take_if_cancelled(session_id, clock)
alt cancelled
Cancelled-->>Spawner: Some -> exit without publish
else not cancelled
Spawner->>Nats: publish session.ready (ExtSessionReady)
Nats-->>Spawner: publish ack / metrics recorded
end
Spawner-->>Bridge: task completes (JoinHandle later awaited/cleaned)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rsworkspace/crates/acp-nats/src/nats/parsing.rs (1)
3-55:⚠️ Potential issue | 🟡 MinorRun
cargo fmton this file.CI is already failing rustfmt for this hunk, so it will not merge cleanly as-is.
Also applies to: 194-286
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats/src/nats/parsing.rs` around lines 3 - 55, The code in the ClientMethod enum and the parse_client_subject function is misformatted; run rustfmt (cargo fmt) on the file to fix spacing/indentation issues, then commit the formatted changes — ensure the formatting covers the ClientMethod enum, ParsedClientSubject struct, and parse_client_subject function so the file passes CI rustfmt checks.
🧹 Nitpick comments (1)
rsworkspace/crates/acp-nats/src/nats/subjects.rs (1)
40-52: Prefer validated value objects in these wildcard builders.Because these helpers produce subscription patterns, raw
&strinputs letsession_id = "*" / ">"or a wildcarded prefix silently widen the subscription scope. I would avoid adding more public primitive-based APIs here and take the existing prefix/session-id value objects instead.As per coding guidelines, "Prefer domain-specific value objects over primitives (e.g.,
AcpPrefixnotString), with each type's factory guaranteeing correctness at construction—invalid instances should be unrepresentable".Also applies to: 108-110
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats/src/nats/subjects.rs` around lines 40 - 52, The wildcard helper functions (wildcards::global, wildcards::all_sessions, wildcards::session) currently accept raw &str which allows callers to pass wildcard characters and widen subscriptions; change their signatures to accept the domain value objects (use AcpPrefix for prefix and the existing SessionId type for session_id) so construction enforces validity, update callers to pass those types, and convert to string patterns inside the functions (e.g., call prefix.as_ref() or prefix.to_string() and session_id.as_ref()/to_string() as needed); do the same refactor for the equivalent helpers at lines 108-110 so all public wildcard builders use the validated value objects instead of primitives.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rsworkspace/crates/acp-nats/src/agent/mod.rs`:
- Around line 114-122: The has_pending_session_ready_tasks method currently
returns true if the vector has any handles even if they are finished; update
has_pending_session_ready_tasks to first lock session_ready_publish_tasks,
remove finished handles (use the same retention logic as
register_session_ready_task: retain(|task| !task.is_finished())), then return
whether the collection is non-empty. This ensures
has_pending_session_ready_tasks accurately reflects active work without changing
register_session_ready_task or await_session_ready_tasks.
In `@rsworkspace/crates/acp-nats/src/config.rs`:
- Around line 10-15: The current implementation only validates the prefix in
AcpPrefix::new and leaves composite subject capacity unchecked
(validate_subject_capacity exists but is only used in tests), and
MAX_SESSION_ID_LENGTH is duplicated; fix by enforcing composite subject-capacity
either inside the prefix/session-id value object factory or unconditionally in
the config constructors: update AcpPrefix::new (or the SessionId factory/type)
to check prefix.len() + session_id_max_len + MAX_SESSIONED_SUBJECT_SUFFIX_LEN <=
NATS_MAX_SUBJECT_BYTES (or call validate_subject_capacity from AcpPrefix::new),
remove the duplicated MAX_SESSION_ID_LENGTH constant from this module and move
the session id length limit into a single SessionId value object (e.g.,
SessionId::new) so all validations (per-type length and composite subject
capacity) are centralized and invoked by Config::with_prefix and
Config::for_test implicitly via the value-object constructors.
---
Outside diff comments:
In `@rsworkspace/crates/acp-nats/src/nats/parsing.rs`:
- Around line 3-55: The code in the ClientMethod enum and the
parse_client_subject function is misformatted; run rustfmt (cargo fmt) on the
file to fix spacing/indentation issues, then commit the formatted changes —
ensure the formatting covers the ClientMethod enum, ParsedClientSubject struct,
and parse_client_subject function so the file passes CI rustfmt checks.
---
Nitpick comments:
In `@rsworkspace/crates/acp-nats/src/nats/subjects.rs`:
- Around line 40-52: The wildcard helper functions (wildcards::global,
wildcards::all_sessions, wildcards::session) currently accept raw &str which
allows callers to pass wildcard characters and widen subscriptions; change their
signatures to accept the domain value objects (use AcpPrefix for prefix and the
existing SessionId type for session_id) so construction enforces validity,
update callers to pass those types, and convert to string patterns inside the
functions (e.g., call prefix.as_ref() or prefix.to_string() and
session_id.as_ref()/to_string() as needed); do the same refactor for the
equivalent helpers at lines 108-110 so all public wildcard builders use the
validated value objects instead of primitives.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e7db67c6-ee95-4adc-83eb-38917b3237ee
📒 Files selected for processing (8)
rsworkspace/crates/acp-nats/src/agent/mod.rsrsworkspace/crates/acp-nats/src/config.rsrsworkspace/crates/acp-nats/src/nats/client_method.rsrsworkspace/crates/acp-nats/src/nats/mod.rsrsworkspace/crates/acp-nats/src/nats/parsed_client_subject.rsrsworkspace/crates/acp-nats/src/nats/parsing.rsrsworkspace/crates/acp-nats/src/nats/subjects.rsrsworkspace/crates/acp-nats/src/pending_prompt_waiters.rs
💤 Files with no reviewable changes (2)
- rsworkspace/crates/acp-nats/src/nats/client_method.rs
- rsworkspace/crates/acp-nats/src/nats/parsed_client_subject.rs
b09a8f2 to
b9fc664
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
rsworkspace/crates/acp-nats/src/agent/mod.rs (1)
114-116:⚠️ Potential issue | 🟡 Minor
has_pending_session_ready_taskscan return staletrue.Line 115 only checks
is_empty(). Finished handles remain in the vector until registration/await, so this can report pending work when none is active.🔧 Suggested fix
pub fn has_pending_session_ready_tasks(&self) -> bool { - !self.session_ready_publish_tasks.lock().unwrap().is_empty() + let mut tasks = self.session_ready_publish_tasks.lock().unwrap(); + tasks.retain(|task| !task.is_finished()); + !tasks.is_empty() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats/src/agent/mod.rs` around lines 114 - 116, The method has_pending_session_ready_tasks currently returns true if the vec in session_ready_publish_tasks is non-empty, but finished JoinHandles remain there and cause stale true; update has_pending_session_ready_tasks to lock session_ready_publish_tasks, either call retain(|h| !h.is_finished()) to remove completed handles and then check is_empty(), or return handles.iter().any(|h| !h.is_finished()), referencing the session_ready_publish_tasks field and the has_pending_session_ready_tasks function to locate where to change the logic.
🧹 Nitpick comments (1)
rsworkspace/crates/acp-nats/src/nats/subjects.rs (1)
41-50: Use typed subject tokens in wildcard builders (AcpPrefix/SessionId).These new helpers currently take raw
&str, which allows invalid tokens or accidental wildcard-bearing input into subject construction. Prefer value-object parameters so invalid instances remain unrepresentable.💡 Suggested direction
- pub fn session(prefix: &str, session_id: &str) -> String { - format!("{}.{}.agent.>", prefix, session_id) - } + pub fn session(prefix: &AcpPrefix, session_id: &SessionId) -> String { + format!("{}.{}.agent.>", prefix.as_str(), session_id) + }As per coding guidelines: "Prefer domain-specific value objects over primitives (e.g.,
AcpPrefixnotString), with each type's factory guaranteeing correctness at construction—invalid instances should be unrepresentable".Also applies to: 108-109
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rsworkspace/crates/acp-nats/src/nats/subjects.rs` around lines 41 - 50, The three subject builders global, all_sessions, and session accept raw &str allowing invalid or wildcard-bearing input; change their signatures to take domain value objects (e.g., AcpPrefix for prefix and SessionId for session_id) and use the value-object accessor (e.g., as_str() or into_inner()) to format the subject; ensure AcpPrefix and SessionId provide constructors that validate/forbid wildcards so callers cannot construct invalid tokens, and then update all call sites (including the other helpers referenced in the file) to pass the typed values instead of raw &str.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rsworkspace/crates/acp-nats/src/agent/mod.rs`:
- Around line 49-55: The formatting of the mark_cancelled function is not
rustfmt-compliant; run rustfmt (or cargo fmt --all) and reformat the block
around the mark_cancelled method (the code that locks self.map, inserts the
session_id with clock.now(), increments cleanup_counter, and calls map.retain
when count.is_multiple_of(CLEANUP_EVERY)) so spacing, braces, and indentation
match project style (including ensuring the if block and map.retain call are
properly indented and line-broken); after formatting, re-run cargo fmt --all --
--check to confirm the change.
In `@rsworkspace/crates/acp-nats/src/config.rs`:
- Around line 152-156: The test block for Config::with_prefix (using
AcpPrefix::new and asserting config.acp_prefix()) is failing the formatting
check; run rustfmt (cargo fmt --all) to reformat this test function so it
complies with rustfmt rules and then update the PR with the reformatted file so
CI passes.
---
Duplicate comments:
In `@rsworkspace/crates/acp-nats/src/agent/mod.rs`:
- Around line 114-116: The method has_pending_session_ready_tasks currently
returns true if the vec in session_ready_publish_tasks is non-empty, but
finished JoinHandles remain there and cause stale true; update
has_pending_session_ready_tasks to lock session_ready_publish_tasks, either call
retain(|h| !h.is_finished()) to remove completed handles and then check
is_empty(), or return handles.iter().any(|h| !h.is_finished()), referencing the
session_ready_publish_tasks field and the has_pending_session_ready_tasks
function to locate where to change the logic.
---
Nitpick comments:
In `@rsworkspace/crates/acp-nats/src/nats/subjects.rs`:
- Around line 41-50: The three subject builders global, all_sessions, and
session accept raw &str allowing invalid or wildcard-bearing input; change their
signatures to take domain value objects (e.g., AcpPrefix for prefix and
SessionId for session_id) and use the value-object accessor (e.g., as_str() or
into_inner()) to format the subject; ensure AcpPrefix and SessionId provide
constructors that validate/forbid wildcards so callers cannot construct invalid
tokens, and then update all call sites (including the other helpers referenced
in the file) to pass the typed values instead of raw &str.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7336c392-7042-4d95-8064-82ac90c9f387
📒 Files selected for processing (8)
rsworkspace/crates/acp-nats/src/agent/mod.rsrsworkspace/crates/acp-nats/src/config.rsrsworkspace/crates/acp-nats/src/nats/client_method.rsrsworkspace/crates/acp-nats/src/nats/mod.rsrsworkspace/crates/acp-nats/src/nats/parsed_client_subject.rsrsworkspace/crates/acp-nats/src/nats/parsing.rsrsworkspace/crates/acp-nats/src/nats/subjects.rsrsworkspace/crates/acp-nats/src/pending_prompt_waiters.rs
💤 Files with no reviewable changes (2)
- rsworkspace/crates/acp-nats/src/nats/client_method.rs
- rsworkspace/crates/acp-nats/src/nats/parsed_client_subject.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- rsworkspace/crates/acp-nats/src/nats/parsing.rs
- rsworkspace/crates/acp-nats/src/pending_prompt_waiters.rs
- rsworkspace/crates/acp-nats/src/nats/mod.rs
d3a979e to
06c4495
Compare
Code Coverage SummaryDetailsDiff against mainResults for commit: e17abc5 Minimum allowed coverage is ♻️ This comment has been updated with latest results |
06c4495 to
220d4ff
Compare
Add CancelledSessions, spawn_session_ready, session-ready task tracking, cancel_waiter_for_session, new NATS wildcard subjects, and parsing consolidation. Includes comprehensive unit tests for all new code paths. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
220d4ff to
e17abc5
Compare

Summary
CancelledSessionsstruct with TTL-based eviction for pre-flight cancel trackingspawn_session_readyand session-ready task management toBridgeClientMethod/ParsedClientSubjectintoparsing.rs, add agent/client wildcard subjectscancel_waiter_for_sessiontoPendingPromptWaitersfor external cancellationConfig::with_prefixalias, fix min-concurrency clampingTest plan
cargo check -p acp-natspassescargo test -p acp-nats— all 492 tests passcargo clippy -p acp-natsclean