Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ jobs:

- name: Run E2E tests
run: |
pytest tests/e2e/ -v -x --timeout=120
pytest tests/e2e/ -v --timeout=120
env:
RUST_LOG: ironclaw=info
RUST_BACKTRACE: "1"
Expand Down
64 changes: 30 additions & 34 deletions src/agent/agent_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ pub struct Agent {
pub(super) heartbeat_config: Option<HeartbeatConfig>,
pub(super) hygiene_config: Option<crate::config::HygieneConfig>,
pub(super) routine_config: Option<RoutineConfig>,
/// Optional slot to expose the routine engine to the gateway for manual triggering.
/// Shared routine-engine slot used for internal event matching and for exposing
/// the engine to gateway/manual trigger entry points.
pub(super) routine_engine_slot:
Option<Arc<tokio::sync::RwLock<Option<Arc<crate::agent::routine_engine::RoutineEngine>>>>>,
Arc<tokio::sync::RwLock<Option<Arc<crate::agent::routine_engine::RoutineEngine>>>>,
}

impl Agent {
Expand Down Expand Up @@ -228,16 +229,21 @@ impl Agent {
heartbeat_config,
hygiene_config,
routine_config,
routine_engine_slot: None,
routine_engine_slot: Arc::new(tokio::sync::RwLock::new(None)),
}
}

/// Set the routine engine slot for exposing the engine to the gateway.
/// Replace the routine-engine slot with a shared one so the gateway and
/// agent reference the same engine.
pub fn set_routine_engine_slot(
&mut self,
slot: Arc<tokio::sync::RwLock<Option<Arc<crate::agent::routine_engine::RoutineEngine>>>>,
) {
self.routine_engine_slot = Some(slot);
self.routine_engine_slot = slot;
}

async fn routine_engine(&self) -> Option<Arc<crate::agent::routine_engine::RoutineEngine>> {
self.routine_engine_slot.read().await.clone()
}

// Convenience accessors
Expand Down Expand Up @@ -633,9 +639,7 @@ impl Agent {
// via a local to use in the message loop below.

// Expose engine to gateway for manual triggering
if let Some(ref slot) = self.routine_engine_slot {
*slot.write().await = Some(Arc::clone(&engine));
}
*self.routine_engine_slot.write().await = Some(Arc::clone(&engine));

tracing::debug!(
"Routines enabled: cron ticker every {}s, max {} concurrent",
Expand All @@ -655,9 +659,6 @@ impl Agent {
None
};

// Extract engine ref for use in message loop
let routine_engine_for_loop = routine_handle.as_ref().map(|(_, e)| Arc::clone(e));

// Main message loop
tracing::debug!("Agent {} ready and listening", self.config.name);

Expand Down Expand Up @@ -693,29 +694,6 @@ impl Agent {
// Store successfully extracted document text in workspace for indexing
self.store_extracted_documents(&message).await;

// Event-triggered routines consume plain user input before it enters
// the normal chat/tool pipeline. This avoids a duplicate turn where
// the main agent responds and the routine also fires on the same
// inbound message.
if !message.is_internal
&& matches!(
SubmissionParser::parse(&message.content),
Submission::UserInput { .. }
)
&& let Some(ref engine) = routine_engine_for_loop
{
let fired = engine.check_event_triggers(&message).await;
if fired > 0 {
tracing::debug!(
channel = %message.channel,
user = %message.user_id,
fired,
"Consumed inbound user message with matching event-triggered routine(s)"
);
continue;
}
}

match self.handle_message(&message).await {
Ok(Some(response)) if !response.is_empty() => {
// Hook: BeforeOutbound — allow hooks to modify or suppress outbound
Expand Down Expand Up @@ -1032,6 +1010,24 @@ impl Agent {
message.content.len()
);

if !message.is_internal
&& let Submission::UserInput { ref content } = submission
&& let Some(engine) = self.routine_engine().await
{
let fired = engine
.check_event_triggers(&message.user_id, &message.channel, content)
.await;
if fired > 0 {
tracing::debug!(
channel = %message.channel,
user = %message.user_id,
fired,
"Consumed inbound user message with matching event-triggered routine(s)"
);
return Ok(Some(String::new()));
}
}

// Process based on submission type
let result = match submission {
Submission::UserInput { content } => {
Expand Down
122 changes: 107 additions & 15 deletions src/agent/routine_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::agent::Scheduler;
use crate::agent::routine::{
NotifyConfig, Routine, RoutineAction, RoutineRun, RunStatus, Trigger, next_cron_fire,
};
use crate::channels::{IncomingMessage, OutgoingResponse};
use crate::channels::OutgoingResponse;
use crate::config::RoutineConfig;
use crate::context::JobContext;
use crate::db::Database;
Expand Down Expand Up @@ -88,6 +88,12 @@ impl RoutineEngine {
}
}

/// Expose the running count for integration tests.
#[doc(hidden)]
pub fn running_count_for_test(&self) -> &Arc<AtomicUsize> {
&self.running_count
}

/// Refresh the in-memory event trigger cache from DB.
pub async fn refresh_event_cache(&self) {
match self.store.list_event_routines().await {
Expand Down Expand Up @@ -135,9 +141,9 @@ impl RoutineEngine {

/// Check incoming message against event triggers. Returns number of routines fired.
///
/// Called synchronously from the main loop after handle_message(). The actual
/// execution is spawned async so this returns quickly.
pub async fn check_event_triggers(&self, message: &IncomingMessage) -> usize {
/// Accepts only the three fields needed for matching (user scope, channel,
/// message content) so callers never need to clone a full `IncomingMessage`.
pub async fn check_event_triggers(&self, user_id: &str, channel: &str, content: &str) -> usize {
let cache = self.event_cache.read().await;
let mut fired = 0;

Expand Down Expand Up @@ -173,21 +179,21 @@ impl RoutineEngine {
EventMatcher::System { .. } => continue,
};

if routine.user_id != message.user_id {
if routine.user_id != user_id {
continue;
}

// Channel filter
if let Trigger::Event {
channel: Some(ch), ..
} = &routine.trigger
&& ch != &message.channel
&& ch != channel
{
continue;
}

// Regex match
if !re.is_match(&message.content) {
if !re.is_match(content) {
continue;
}

Expand All @@ -210,7 +216,7 @@ impl RoutineEngine {
continue;
}

let detail = truncate(&message.content, 200);
let detail = truncate(content, 200);
self.spawn_fire(routine.clone(), "event", Some(detail));
fired += 1;
}
Expand Down Expand Up @@ -508,6 +514,88 @@ impl RoutineEngine {
}
}

/// Watches a dispatched full_job until the linked scheduler job completes.
///
/// Polls `store.get_job(job_id)` at a fixed interval until the job leaves
/// an active state (Pending/InProgress/Stuck). Maps the final `JobState` to
/// a `RunStatus` for the routine run.
struct FullJobWatcher {
store: Arc<dyn Database>,
job_id: Uuid,
routine_name: String,
}

impl FullJobWatcher {
/// Poll interval between DB checks.
const POLL_INTERVAL: Duration = Duration::from_secs(5);
/// Safety ceiling: 24 hours, derived from POLL_INTERVAL.
const MAX_POLLS: u32 = (24 * 60 * 60) / Self::POLL_INTERVAL.as_secs() as u32;

fn new(store: Arc<dyn Database>, job_id: Uuid, routine_name: String) -> Self {
Self {
store,
job_id,
routine_name,
}
}

/// Block until the linked job finishes and return the mapped status + summary.
async fn wait_for_completion(&self) -> (RunStatus, Option<String>) {
let mut polls = 0u32;

let final_status = loop {
// Check job state before sleeping so we finalize promptly
// if the job is already done (e.g. fast-failing jobs).
match self.store.get_job(self.job_id).await {
Ok(Some(job_ctx)) => {
if !job_ctx.state.is_active() {
break Self::map_job_state(&job_ctx.state);
}
}
Ok(None) => {
tracing::warn!(
routine = %self.routine_name,
job_id = %self.job_id,
"full_job disappeared from DB while polling"
);
break RunStatus::Failed;
}
Err(e) => {
tracing::error!(
routine = %self.routine_name,
job_id = %self.job_id,
"Error polling full_job state: {}", e
);
break RunStatus::Failed;
}
}

polls += 1;
if polls >= Self::MAX_POLLS {
tracing::error!(
routine = %self.routine_name,
job_id = %self.job_id,
"full_job timed out after 24 hours, treating as failed"
);
break RunStatus::Failed;
}

tokio::time::sleep(Self::POLL_INTERVAL).await;
};

let summary = format!("Job {} finished ({})", self.job_id, final_status);
(final_status, Some(summary))
}

fn map_job_state(state: &crate::context::JobState) -> RunStatus {
use crate::context::JobState;
match state {
JobState::Failed | JobState::Cancelled => RunStatus::Failed,
_ => RunStatus::Ok, // Completed / Submitted / Accepted
}
}
}

/// Shared context passed to the execution function.
struct EngineContext {
config: RoutineConfig,
Expand Down Expand Up @@ -682,8 +770,10 @@ fn sanitize_routine_name(name: &str) -> String {
///
/// Fire-and-forget: creates a job via `Scheduler::dispatch_job` (which handles
/// creation, metadata, persistence, and scheduling), links the routine run to
/// the job, and returns immediately. The job runs independently via the
/// existing Worker/Scheduler with full tool access.
/// the job, then watches it via `FullJobWatcher` until it reaches a
/// non-active state (not Pending/InProgress/Stuck). Returns the final
/// `RunStatus` mapped from the job outcome. This keeps the routine run
/// active for the full job lifetime so concurrency guardrails apply.
async fn execute_full_job(
ctx: &EngineContext,
routine: &Routine,
Expand Down Expand Up @@ -738,13 +828,15 @@ async fn execute_full_job(
routine = %routine.name,
job_id = %job_id,
max_iterations = max_iterations,
"Dispatched full job for routine"
"Dispatched full job for routine, watching for completion"
);

let summary = format!(
"Dispatched job {job_id} for full execution with tool access (max_iterations: {max_iterations})"
);
Ok((RunStatus::Ok, Some(summary), None))
// Watch the job until it finishes — keeps the routine run active
// so concurrency guardrails (running_count, routine_runs status)
// remain enforced for the full job lifetime.
let watcher = FullJobWatcher::new(ctx.store.clone(), job_id, routine.name.clone());
let (status, summary) = watcher.wait_for_completion().await;
Ok((status, summary, None))
}

/// Execute a lightweight routine with optional tool support.
Expand Down
Loading
Loading