Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 52 additions & 32 deletions src/agent/routine_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! Lightweight routines execute inline (single LLM call, no scheduler slot).
//! Full-job routines are delegated to the existing `Scheduler`.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
Expand All @@ -31,11 +32,11 @@ use crate::error::RoutineError;
use crate::llm::{
ChatMessage, CompletionRequest, FinishReason, LlmProvider, ToolCall, ToolCompletionRequest,
};
use crate::safety::SafetyLayer;
use crate::tools::{
ApprovalContext, ApprovalRequirement, ToolError, ToolRegistry, prepare_tool_params,
};
use crate::workspace::Workspace;
use ironclaw_safety::SafetyLayer;

enum EventMatcher {
Message { routine: Routine, regex: Regex },
Expand Down Expand Up @@ -145,6 +146,15 @@ impl RoutineEngine {
/// message content) so callers never need to clone a full `IncomingMessage`.
pub async fn check_event_triggers(&self, user_id: &str, channel: &str, content: &str) -> usize {
let cache = self.event_cache.read().await;

// Early return if there are no message matchers at all.
if !cache
.iter()
.any(|m| matches!(m, EventMatcher::Message { .. }))
{
return 0;
}

let mut fired = 0;

// Collect routine IDs for batch query
Expand All @@ -161,16 +171,9 @@ impl RoutineEngine {
}

// Single batch query instead of N queries
let concurrent_counts = match self
.store
.count_running_routine_runs_batch(&routine_ids)
.await
{
Ok(counts) => counts,
Err(e) => {
tracing::error!("Failed to batch-load concurrent counts: {}", e);
return 0;
}
let concurrent_counts = match self.batch_concurrent_counts(&routine_ids).await {
Some(counts) => counts,
None => return 0,
};

for matcher in cache.iter() {
Expand Down Expand Up @@ -235,6 +238,15 @@ impl RoutineEngine {
user_id: Option<&str>,
) -> usize {
let cache = self.event_cache.read().await;

// Early return if there are no system-event matchers at all.
if !cache
.iter()
.any(|m| matches!(m, EventMatcher::System { .. }))
{
return 0;
}

let mut fired = 0;

// Collect routine IDs for batch query
Expand All @@ -251,19 +263,9 @@ impl RoutineEngine {
}

// Single batch query instead of N queries
let concurrent_counts = match self
.store
.count_running_routine_runs_batch(&routine_ids)
.await
{
Ok(counts) => counts,
Err(e) => {
tracing::error!(
"Failed to batch-load concurrent counts for system events: {}",
e
);
return 0;
}
let concurrent_counts = match self.batch_concurrent_counts(&routine_ids).await {
Some(counts) => counts,
None => return 0,
};

for matcher in cache.iter() {
Expand Down Expand Up @@ -337,6 +339,23 @@ impl RoutineEngine {
fired
}

/// Batch-load concurrent run counts for a set of routine IDs.
///
/// Returns `None` on database error (already logged).
async fn batch_concurrent_counts(&self, routine_ids: &[Uuid]) -> Option<HashMap<Uuid, i64>> {
match self
.store
.count_running_routine_runs_batch(routine_ids)
.await
{
Ok(counts) => Some(counts),
Err(e) => {
tracing::error!("Failed to batch-load concurrent counts: {}", e);
None
}
Comment on lines +345 to +355
}
}

/// Check all due cron routines and fire them. Called by the cron ticker.
pub async fn check_cron_triggers(&self) {
let routines = match self.store.list_due_cron_routines().await {
Expand Down Expand Up @@ -1041,8 +1060,8 @@ fn handle_text_response(
};
}

// Check for the "nothing to do" sentinel
if content == "ROUTINE_OK" || content.contains("ROUTINE_OK") {
// Check for the "nothing to do" sentinel (exact match on trimmed content).
if content == "ROUTINE_OK" {
let total_tokens = Some((total_input_tokens + total_output_tokens) as i32);
return Ok((RunStatus::Ok, None, total_tokens));
}
Expand Down Expand Up @@ -1582,20 +1601,21 @@ mod tests {

#[test]
fn test_routine_sentinel_detection_exact_match() {
// The execute_lightweight_no_tools checks: content == "ROUTINE_OK" || content.contains("ROUTINE_OK")
// After trim(), whitespace is removed
// Sentinel detection uses exact match on trimmed content to avoid
// false positives from substrings like "NOT_ROUTINE_OK".
let test_cases = vec![
("ROUTINE_OK", true),
(" ROUTINE_OK ", true), // After trim, whitespace is removed so matches
("something ROUTINE_OK something", true),
("ROUTINE_OK is done", true),
("done ROUTINE_OK", true),
("something ROUTINE_OK something", false), // substring no longer matches
("ROUTINE_OK is done", false), // substring no longer matches
("done ROUTINE_OK", false), // substring no longer matches
("NOT_ROUTINE_OK", false), // exact match prevents this
("no sentinel here", false),
];

for (content, should_match) in test_cases {
let trimmed = content.trim();
let matches = trimmed == "ROUTINE_OK" || trimmed.contains("ROUTINE_OK");
let matches = trimmed == "ROUTINE_OK";
assert_eq!(
matches, should_match,
"Content '{}' sentinel detection should be {}, got {}",
Expand Down
8 changes: 5 additions & 3 deletions src/tools/builtin/routine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! - `event_emit` - Emit a structured system event to `system_event`-triggered routines

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::time::Duration;

use async_trait::async_trait;
Expand Down Expand Up @@ -624,7 +624,8 @@ pub(crate) fn routine_create_parameters_schema() -> Value {
}

fn routine_create_discovery_schema() -> Value {
routine_create_schema(true)
static CACHE: OnceLock<Value> = OnceLock::new();
CACHE.get_or_init(|| routine_create_schema(true)).clone()
}

pub(crate) fn routine_update_parameters_schema() -> Value {
Expand Down Expand Up @@ -1007,7 +1008,8 @@ pub(crate) fn event_emit_parameters_schema() -> Value {
}

fn event_emit_discovery_schema() -> Value {
event_emit_schema(true)
static CACHE: OnceLock<Value> = OnceLock::new();
CACHE.get_or_init(|| event_emit_schema(true)).clone()
}

fn parse_event_emit_args(params: &Value) -> Result<(String, String, Value), ToolError> {
Expand Down
Loading