@@ -19,21 +19,53 @@ use async_trait::async_trait;
1919use crate :: agent:: agentic_loop:: {
2020 AgenticLoopConfig , LoopDelegate , LoopOutcome , LoopSignal , TextAction ,
2121} ;
22- use crate :: llm:: { ChatMessage , Reasoning , ReasoningContext } ;
22+ use crate :: llm:: { ChatMessage , Reasoning , ReasoningContext , TokenUsage } ;
2323use crate :: tools:: permissions:: { PermissionState , effective_permission} ;
2424use crate :: tools:: redact_params;
2525
2626/// Result of the agentic loop execution.
2727pub ( super ) enum AgenticLoopResult {
2828 /// Completed with a response.
29- Response ( String ) ,
29+ Response {
30+ text : String ,
31+ turn_usage : TurnUsageSummary ,
32+ } ,
3033 /// A tool requires approval before continuing.
3134 NeedApproval {
3235 /// The pending approval request to store.
3336 pending : Box < PendingApproval > ,
37+ /// Usage accumulated before the turn paused for approval.
38+ turn_usage : TurnUsageSummary ,
39+ } ,
40+ /// The loop failed after spending usage in the current turn.
41+ Failed {
42+ error : Error ,
43+ turn_usage : TurnUsageSummary ,
3444 } ,
3545}
3646
47+ #[ derive( Debug , Clone , Default ) ]
48+ pub ( super ) struct TurnUsageSummary {
49+ pub usage : TokenUsage ,
50+ pub cost_usd : rust_decimal:: Decimal ,
51+ }
52+
53+ impl TurnUsageSummary {
54+ fn record_llm_call ( & mut self , usage : TokenUsage , cost_usd : rust_decimal:: Decimal ) {
55+ self . usage . input_tokens = self . usage . input_tokens . saturating_add ( usage. input_tokens ) ;
56+ self . usage . output_tokens = self . usage . output_tokens . saturating_add ( usage. output_tokens ) ;
57+ self . usage . cache_read_input_tokens = self
58+ . usage
59+ . cache_read_input_tokens
60+ . saturating_add ( usage. cache_read_input_tokens ) ;
61+ self . usage . cache_creation_input_tokens = self
62+ . usage
63+ . cache_creation_input_tokens
64+ . saturating_add ( usage. cache_creation_input_tokens ) ;
65+ self . cost_usd += cost_usd;
66+ }
67+ }
68+
3769impl Agent {
3870 /// Run the agentic loop: call LLM, execute tools, repeat until text response.
3971 ///
@@ -200,6 +232,7 @@ impl Agent {
200232 nudge_at,
201233 force_text_at,
202234 user_tz,
235+ turn_usage : std:: sync:: Mutex :: new ( TurnUsageSummary :: default ( ) ) ,
203236 cached_tool_permissions : std:: sync:: Mutex :: new ( None ) ,
204237 } ;
205238
@@ -242,26 +275,41 @@ impl Agent {
242275 & mut reason_ctx,
243276 & loop_config,
244277 )
245- . await ?;
278+ . await ;
279+
280+ let turn_usage = delegate. turn_usage_summary ( ) ;
246281
247282 match outcome {
248- LoopOutcome :: Response ( text) => Ok ( AgenticLoopResult :: Response ( text) ) ,
249- LoopOutcome :: Stopped => Err ( crate :: error:: JobError :: ContextError {
250- id : thread_id,
251- reason : "Interrupted" . to_string ( ) ,
252- }
253- . into ( ) ) ,
254- LoopOutcome :: MaxIterations => Err ( crate :: error:: LlmError :: InvalidResponse {
255- provider : "agent" . to_string ( ) ,
256- reason : format ! ( "Exceeded maximum tool iterations ({max_tool_iterations})" ) ,
257- }
258- . into ( ) ) ,
259- LoopOutcome :: Failure ( reason) => Err ( crate :: error:: LlmError :: InvalidResponse {
260- provider : "agent" . to_string ( ) ,
261- reason,
262- }
263- . into ( ) ) ,
264- LoopOutcome :: NeedApproval ( pending) => Ok ( AgenticLoopResult :: NeedApproval { pending } ) ,
283+ Ok ( LoopOutcome :: Response ( text) ) => Ok ( AgenticLoopResult :: Response { text, turn_usage } ) ,
284+ Ok ( LoopOutcome :: Stopped ) => Ok ( AgenticLoopResult :: Failed {
285+ error : crate :: error:: JobError :: ContextError {
286+ id : thread_id,
287+ reason : "Interrupted" . to_string ( ) ,
288+ }
289+ . into ( ) ,
290+ turn_usage,
291+ } ) ,
292+ Ok ( LoopOutcome :: MaxIterations ) => Ok ( AgenticLoopResult :: Failed {
293+ error : crate :: error:: LlmError :: InvalidResponse {
294+ provider : "agent" . to_string ( ) ,
295+ reason : format ! ( "Exceeded maximum tool iterations ({max_tool_iterations})" ) ,
296+ }
297+ . into ( ) ,
298+ turn_usage,
299+ } ) ,
300+ Ok ( LoopOutcome :: Failure ( reason) ) => Ok ( AgenticLoopResult :: Failed {
301+ error : crate :: error:: LlmError :: InvalidResponse {
302+ provider : "agent" . to_string ( ) ,
303+ reason,
304+ }
305+ . into ( ) ,
306+ turn_usage,
307+ } ) ,
308+ Ok ( LoopOutcome :: NeedApproval ( pending) ) => Ok ( AgenticLoopResult :: NeedApproval {
309+ pending,
310+ turn_usage,
311+ } ) ,
312+ Err ( error) => Ok ( AgenticLoopResult :: Failed { error, turn_usage } ) ,
265313 }
266314 }
267315
@@ -295,10 +343,32 @@ struct ChatDelegate<'a> {
295343 nudge_at : usize ,
296344 force_text_at : usize ,
297345 user_tz : chrono_tz:: Tz ,
346+ turn_usage : std:: sync:: Mutex < TurnUsageSummary > ,
298347 cached_tool_permissions :
299348 std:: sync:: Mutex < Option < std:: collections:: HashMap < String , PermissionState > > > ,
300349}
301350
351+ impl ChatDelegate < ' _ > {
352+ fn turn_usage_summary ( & self ) -> TurnUsageSummary {
353+ self . with_turn_usage ( |turn_usage| turn_usage. clone ( ) )
354+ }
355+
356+ fn record_turn_usage ( & self , usage : TokenUsage , cost_usd : rust_decimal:: Decimal ) {
357+ self . with_turn_usage ( |turn_usage| turn_usage. record_llm_call ( usage, cost_usd) ) ;
358+ }
359+
360+ fn with_turn_usage < R > ( & self , f : impl FnOnce ( & mut TurnUsageSummary ) -> R ) -> R {
361+ match self . turn_usage . lock ( ) {
362+ Ok ( mut turn_usage) => f ( & mut turn_usage) ,
363+ Err ( poisoned) => {
364+ tracing:: warn!( "turn usage mutex poisoned; recovering accumulated usage" ) ;
365+ let mut turn_usage = poisoned. into_inner ( ) ;
366+ f ( & mut turn_usage)
367+ }
368+ }
369+ }
370+ }
371+
302372#[ async_trait]
303373impl < ' a > LoopDelegate for ChatDelegate < ' a > {
304374 async fn check_signals ( & self ) -> LoopSignal {
@@ -588,6 +658,8 @@ impl<'a> LoopDelegate for ChatDelegate<'a> {
588658 }
589659 }
590660
661+ self . record_turn_usage ( output. usage , call_cost) ;
662+
591663 Ok ( output)
592664 }
593665
@@ -1478,6 +1550,48 @@ mod tests {
14781550 }
14791551 }
14801552
1553+ struct FixedUsageTextProvider ;
1554+
1555+ #[ async_trait]
1556+ impl LlmProvider for FixedUsageTextProvider {
1557+ fn model_name ( & self ) -> & str {
1558+ "fixed-usage"
1559+ }
1560+
1561+ fn cost_per_token ( & self ) -> ( Decimal , Decimal ) {
1562+ ( Decimal :: new ( 1 , 3 ) , Decimal :: new ( 2 , 3 ) )
1563+ }
1564+
1565+ async fn complete (
1566+ & self ,
1567+ _request : CompletionRequest ,
1568+ ) -> Result < CompletionResponse , crate :: error:: LlmError > {
1569+ Ok ( CompletionResponse {
1570+ content : "done" . to_string ( ) ,
1571+ input_tokens : 12 ,
1572+ output_tokens : 3 ,
1573+ finish_reason : FinishReason :: Stop ,
1574+ cache_read_input_tokens : 0 ,
1575+ cache_creation_input_tokens : 0 ,
1576+ } )
1577+ }
1578+
1579+ async fn complete_with_tools (
1580+ & self ,
1581+ _request : ToolCompletionRequest ,
1582+ ) -> Result < ToolCompletionResponse , crate :: error:: LlmError > {
1583+ Ok ( ToolCompletionResponse {
1584+ content : Some ( "done" . to_string ( ) ) ,
1585+ tool_calls : Vec :: new ( ) ,
1586+ input_tokens : 12 ,
1587+ output_tokens : 3 ,
1588+ finish_reason : FinishReason :: Stop ,
1589+ cache_read_input_tokens : 0 ,
1590+ cache_creation_input_tokens : 0 ,
1591+ } )
1592+ }
1593+ }
1594+
14811595 /// Build a minimal `Agent` for unit testing (no DB, no workspace, no extensions).
14821596 fn make_test_agent ( ) -> Agent {
14831597 let deps = AgentDeps {
@@ -2587,12 +2701,61 @@ mod tests {
25872701
25882702 // Verify we got a text response.
25892703 match inner. unwrap ( ) {
2590- super :: AgenticLoopResult :: Response ( text) => {
2704+ super :: AgenticLoopResult :: Response { text, .. } => {
25912705 assert ! ( !text. is_empty( ) , "Expected non-empty forced text response" ) ;
25922706 }
25932707 super :: AgenticLoopResult :: NeedApproval { .. } => {
25942708 panic ! ( "Expected text response, got NeedApproval" ) ;
25952709 }
2710+ super :: AgenticLoopResult :: Failed { error, .. } => {
2711+ panic ! ( "Expected text response, got Failed: {error}" ) ;
2712+ }
2713+ }
2714+ }
2715+
2716+ #[ tokio:: test]
2717+ async fn test_dispatcher_response_usage_is_per_turn_not_cumulative ( ) {
2718+ use crate :: agent:: session:: Session ;
2719+ use crate :: channels:: IncomingMessage ;
2720+ use crate :: llm:: ChatMessage ;
2721+ use tokio:: sync:: Mutex ;
2722+
2723+ let agent = make_test_agent_with_llm ( Arc :: new ( FixedUsageTextProvider ) , 3 ) ;
2724+ let session = Arc :: new ( Mutex :: new ( Session :: new ( "test-user" ) ) ) ;
2725+ let thread_id = {
2726+ let mut sess = session. lock ( ) . await ;
2727+ sess. create_thread ( Some ( "test" ) ) . id
2728+ } ;
2729+ let tenant = agent. tenant_ctx ( "test-user" ) . await ;
2730+
2731+ for prompt in [ "first turn" , "second turn" ] {
2732+ let message = IncomingMessage :: new ( "test" , "test-user" , prompt) ;
2733+ let initial_messages = vec ! [ ChatMessage :: user( prompt) ] ;
2734+ let result = agent
2735+ . run_agentic_loop (
2736+ & message,
2737+ tenant. clone ( ) ,
2738+ session. clone ( ) ,
2739+ thread_id,
2740+ initial_messages,
2741+ )
2742+ . await
2743+ . expect ( "dispatcher run should succeed" ) ;
2744+
2745+ match result {
2746+ super :: AgenticLoopResult :: Response { text, turn_usage } => {
2747+ assert_eq ! ( text, "done" ) ;
2748+ assert_eq ! ( turn_usage. usage. input_tokens, 12 ) ;
2749+ assert_eq ! ( turn_usage. usage. output_tokens, 3 ) ;
2750+ assert_eq ! ( turn_usage. cost_usd, Decimal :: new( 18 , 3 ) ) ;
2751+ }
2752+ super :: AgenticLoopResult :: NeedApproval { .. } => {
2753+ panic ! ( "expected a text response" ) ;
2754+ }
2755+ super :: AgenticLoopResult :: Failed { error, .. } => {
2756+ panic ! ( "expected a text response, got Failed: {error}" ) ;
2757+ }
2758+ }
25962759 }
25972760 }
25982761
0 commit comments