Skip to content

Commit e7c8901

Browse files
authored
Fix turn cost footer and per-turn usage accounting (#1951)
* Fix turn cost footer and per-turn usage accounting * Avoid panicking on poisoned turn usage mutex * Tighten SSE usage regression coverage * Report usage for interrupted turns
1 parent 3004583 commit e7c8901

6 files changed

Lines changed: 409 additions & 82 deletions

File tree

src/agent/agent_loop.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,11 +352,6 @@ impl Agent {
352352
repo_url: Some("https://github.com/nearai/ironclaw".to_string()),
353353
}
354354
}
355-
356-
pub(super) fn cost_guard(&self) -> &Arc<crate::agent::cost_guard::CostGuard> {
357-
&self.deps.cost_guard
358-
}
359-
360355
/// Build a tenant-scoped execution context for the given user.
361356
///
362357
/// This is the standard entry point for per-user operations. The returned

src/agent/dispatcher.rs

Lines changed: 184 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,53 @@ use async_trait::async_trait;
1919
use crate::agent::agentic_loop::{
2020
AgenticLoopConfig, LoopDelegate, LoopOutcome, LoopSignal, TextAction,
2121
};
22-
use crate::llm::{ChatMessage, Reasoning, ReasoningContext};
22+
use crate::llm::{ChatMessage, Reasoning, ReasoningContext, TokenUsage};
2323
use crate::tools::permissions::{PermissionState, effective_permission};
2424
use crate::tools::redact_params;
2525

2626
/// Result of the agentic loop execution.
2727
pub(super) enum AgenticLoopResult {
2828
/// Completed with a response.
29-
Response(String),
29+
Response {
30+
text: String,
31+
turn_usage: TurnUsageSummary,
32+
},
3033
/// A tool requires approval before continuing.
3134
NeedApproval {
3235
/// The pending approval request to store.
3336
pending: Box<PendingApproval>,
37+
/// Usage accumulated before the turn paused for approval.
38+
turn_usage: TurnUsageSummary,
39+
},
40+
/// The loop failed after spending usage in the current turn.
41+
Failed {
42+
error: Error,
43+
turn_usage: TurnUsageSummary,
3444
},
3545
}
3646

47+
#[derive(Debug, Clone, Default)]
48+
pub(super) struct TurnUsageSummary {
49+
pub usage: TokenUsage,
50+
pub cost_usd: rust_decimal::Decimal,
51+
}
52+
53+
impl TurnUsageSummary {
54+
fn record_llm_call(&mut self, usage: TokenUsage, cost_usd: rust_decimal::Decimal) {
55+
self.usage.input_tokens = self.usage.input_tokens.saturating_add(usage.input_tokens);
56+
self.usage.output_tokens = self.usage.output_tokens.saturating_add(usage.output_tokens);
57+
self.usage.cache_read_input_tokens = self
58+
.usage
59+
.cache_read_input_tokens
60+
.saturating_add(usage.cache_read_input_tokens);
61+
self.usage.cache_creation_input_tokens = self
62+
.usage
63+
.cache_creation_input_tokens
64+
.saturating_add(usage.cache_creation_input_tokens);
65+
self.cost_usd += cost_usd;
66+
}
67+
}
68+
3769
impl Agent {
3870
/// Run the agentic loop: call LLM, execute tools, repeat until text response.
3971
///
@@ -200,6 +232,7 @@ impl Agent {
200232
nudge_at,
201233
force_text_at,
202234
user_tz,
235+
turn_usage: std::sync::Mutex::new(TurnUsageSummary::default()),
203236
cached_tool_permissions: std::sync::Mutex::new(None),
204237
};
205238

@@ -242,26 +275,41 @@ impl Agent {
242275
&mut reason_ctx,
243276
&loop_config,
244277
)
245-
.await?;
278+
.await;
279+
280+
let turn_usage = delegate.turn_usage_summary();
246281

247282
match outcome {
248-
LoopOutcome::Response(text) => Ok(AgenticLoopResult::Response(text)),
249-
LoopOutcome::Stopped => Err(crate::error::JobError::ContextError {
250-
id: thread_id,
251-
reason: "Interrupted".to_string(),
252-
}
253-
.into()),
254-
LoopOutcome::MaxIterations => Err(crate::error::LlmError::InvalidResponse {
255-
provider: "agent".to_string(),
256-
reason: format!("Exceeded maximum tool iterations ({max_tool_iterations})"),
257-
}
258-
.into()),
259-
LoopOutcome::Failure(reason) => Err(crate::error::LlmError::InvalidResponse {
260-
provider: "agent".to_string(),
261-
reason,
262-
}
263-
.into()),
264-
LoopOutcome::NeedApproval(pending) => Ok(AgenticLoopResult::NeedApproval { pending }),
283+
Ok(LoopOutcome::Response(text)) => Ok(AgenticLoopResult::Response { text, turn_usage }),
284+
Ok(LoopOutcome::Stopped) => Ok(AgenticLoopResult::Failed {
285+
error: crate::error::JobError::ContextError {
286+
id: thread_id,
287+
reason: "Interrupted".to_string(),
288+
}
289+
.into(),
290+
turn_usage,
291+
}),
292+
Ok(LoopOutcome::MaxIterations) => Ok(AgenticLoopResult::Failed {
293+
error: crate::error::LlmError::InvalidResponse {
294+
provider: "agent".to_string(),
295+
reason: format!("Exceeded maximum tool iterations ({max_tool_iterations})"),
296+
}
297+
.into(),
298+
turn_usage,
299+
}),
300+
Ok(LoopOutcome::Failure(reason)) => Ok(AgenticLoopResult::Failed {
301+
error: crate::error::LlmError::InvalidResponse {
302+
provider: "agent".to_string(),
303+
reason,
304+
}
305+
.into(),
306+
turn_usage,
307+
}),
308+
Ok(LoopOutcome::NeedApproval(pending)) => Ok(AgenticLoopResult::NeedApproval {
309+
pending,
310+
turn_usage,
311+
}),
312+
Err(error) => Ok(AgenticLoopResult::Failed { error, turn_usage }),
265313
}
266314
}
267315

@@ -295,10 +343,32 @@ struct ChatDelegate<'a> {
295343
nudge_at: usize,
296344
force_text_at: usize,
297345
user_tz: chrono_tz::Tz,
346+
turn_usage: std::sync::Mutex<TurnUsageSummary>,
298347
cached_tool_permissions:
299348
std::sync::Mutex<Option<std::collections::HashMap<String, PermissionState>>>,
300349
}
301350

351+
impl ChatDelegate<'_> {
352+
fn turn_usage_summary(&self) -> TurnUsageSummary {
353+
self.with_turn_usage(|turn_usage| turn_usage.clone())
354+
}
355+
356+
fn record_turn_usage(&self, usage: TokenUsage, cost_usd: rust_decimal::Decimal) {
357+
self.with_turn_usage(|turn_usage| turn_usage.record_llm_call(usage, cost_usd));
358+
}
359+
360+
fn with_turn_usage<R>(&self, f: impl FnOnce(&mut TurnUsageSummary) -> R) -> R {
361+
match self.turn_usage.lock() {
362+
Ok(mut turn_usage) => f(&mut turn_usage),
363+
Err(poisoned) => {
364+
tracing::warn!("turn usage mutex poisoned; recovering accumulated usage");
365+
let mut turn_usage = poisoned.into_inner();
366+
f(&mut turn_usage)
367+
}
368+
}
369+
}
370+
}
371+
302372
#[async_trait]
303373
impl<'a> LoopDelegate for ChatDelegate<'a> {
304374
async fn check_signals(&self) -> LoopSignal {
@@ -588,6 +658,8 @@ impl<'a> LoopDelegate for ChatDelegate<'a> {
588658
}
589659
}
590660

661+
self.record_turn_usage(output.usage, call_cost);
662+
591663
Ok(output)
592664
}
593665

@@ -1478,6 +1550,48 @@ mod tests {
14781550
}
14791551
}
14801552

1553+
struct FixedUsageTextProvider;
1554+
1555+
#[async_trait]
1556+
impl LlmProvider for FixedUsageTextProvider {
1557+
fn model_name(&self) -> &str {
1558+
"fixed-usage"
1559+
}
1560+
1561+
fn cost_per_token(&self) -> (Decimal, Decimal) {
1562+
(Decimal::new(1, 3), Decimal::new(2, 3))
1563+
}
1564+
1565+
async fn complete(
1566+
&self,
1567+
_request: CompletionRequest,
1568+
) -> Result<CompletionResponse, crate::error::LlmError> {
1569+
Ok(CompletionResponse {
1570+
content: "done".to_string(),
1571+
input_tokens: 12,
1572+
output_tokens: 3,
1573+
finish_reason: FinishReason::Stop,
1574+
cache_read_input_tokens: 0,
1575+
cache_creation_input_tokens: 0,
1576+
})
1577+
}
1578+
1579+
async fn complete_with_tools(
1580+
&self,
1581+
_request: ToolCompletionRequest,
1582+
) -> Result<ToolCompletionResponse, crate::error::LlmError> {
1583+
Ok(ToolCompletionResponse {
1584+
content: Some("done".to_string()),
1585+
tool_calls: Vec::new(),
1586+
input_tokens: 12,
1587+
output_tokens: 3,
1588+
finish_reason: FinishReason::Stop,
1589+
cache_read_input_tokens: 0,
1590+
cache_creation_input_tokens: 0,
1591+
})
1592+
}
1593+
}
1594+
14811595
/// Build a minimal `Agent` for unit testing (no DB, no workspace, no extensions).
14821596
fn make_test_agent() -> Agent {
14831597
let deps = AgentDeps {
@@ -2587,12 +2701,61 @@ mod tests {
25872701

25882702
// Verify we got a text response.
25892703
match inner.unwrap() {
2590-
super::AgenticLoopResult::Response(text) => {
2704+
super::AgenticLoopResult::Response { text, .. } => {
25912705
assert!(!text.is_empty(), "Expected non-empty forced text response");
25922706
}
25932707
super::AgenticLoopResult::NeedApproval { .. } => {
25942708
panic!("Expected text response, got NeedApproval");
25952709
}
2710+
super::AgenticLoopResult::Failed { error, .. } => {
2711+
panic!("Expected text response, got Failed: {error}");
2712+
}
2713+
}
2714+
}
2715+
2716+
#[tokio::test]
2717+
async fn test_dispatcher_response_usage_is_per_turn_not_cumulative() {
2718+
use crate::agent::session::Session;
2719+
use crate::channels::IncomingMessage;
2720+
use crate::llm::ChatMessage;
2721+
use tokio::sync::Mutex;
2722+
2723+
let agent = make_test_agent_with_llm(Arc::new(FixedUsageTextProvider), 3);
2724+
let session = Arc::new(Mutex::new(Session::new("test-user")));
2725+
let thread_id = {
2726+
let mut sess = session.lock().await;
2727+
sess.create_thread(Some("test")).id
2728+
};
2729+
let tenant = agent.tenant_ctx("test-user").await;
2730+
2731+
for prompt in ["first turn", "second turn"] {
2732+
let message = IncomingMessage::new("test", "test-user", prompt);
2733+
let initial_messages = vec![ChatMessage::user(prompt)];
2734+
let result = agent
2735+
.run_agentic_loop(
2736+
&message,
2737+
tenant.clone(),
2738+
session.clone(),
2739+
thread_id,
2740+
initial_messages,
2741+
)
2742+
.await
2743+
.expect("dispatcher run should succeed");
2744+
2745+
match result {
2746+
super::AgenticLoopResult::Response { text, turn_usage } => {
2747+
assert_eq!(text, "done");
2748+
assert_eq!(turn_usage.usage.input_tokens, 12);
2749+
assert_eq!(turn_usage.usage.output_tokens, 3);
2750+
assert_eq!(turn_usage.cost_usd, Decimal::new(18, 3));
2751+
}
2752+
super::AgenticLoopResult::NeedApproval { .. } => {
2753+
panic!("expected a text response");
2754+
}
2755+
super::AgenticLoopResult::Failed { error, .. } => {
2756+
panic!("expected a text response, got Failed: {error}");
2757+
}
2758+
}
25962759
}
25972760
}
25982761

0 commit comments

Comments
 (0)