Skip to content

Commit 4502cf5

Browse files
cephalonautoz-agent
andcommitted
Reapply "Replay agent events on restore" (#25055)
Reverts warpdotdev/warp-internal#25055 Co-Authored-By: Oz <oz-agent@warp.dev>
1 parent e20fa7a commit 4502cf5

22 files changed

Lines changed: 1131 additions & 1 deletion

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/src/ai/agent/api/convert_conversation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pub fn convert_conversation_data_to_ai_conversation(
8484
parent_conversation_id: None,
8585
run_id: None,
8686
autoexecute_override: None,
87+
last_event_sequence: None,
8788
},
8889
RestorationMode::Continue => AgentConversationData {
8990
server_conversation_token: Some(
@@ -102,6 +103,7 @@ pub fn convert_conversation_data_to_ai_conversation(
102103
// dispatch time; adding it here would avoid a round-trip to StreamInit.
103104
run_id: None,
104105
autoexecute_override: None,
106+
last_event_sequence: None,
105107
},
106108
};
107109

app/src/ai/agent/conversation.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ pub struct AIConversation {
223223
/// these conversations — the remote worker's own client handles status
224224
/// reporting. TaskStatusSyncModel skips status updates for these.
225225
is_remote_child: bool,
226+
227+
/// The last event sequence number observed from the v2 orchestration
228+
/// event log. Used on restore to resume event delivery without
229+
/// re-delivering already-processed events.
230+
last_event_sequence: Option<i64>,
226231
}
227232

228233
pub(crate) fn artifact_from_fork_proto(
@@ -272,6 +277,7 @@ impl AIConversation {
272277
agent_name: None,
273278
parent_conversation_id: None,
274279
is_remote_child: false,
280+
last_event_sequence: None,
275281
}
276282
}
277283

@@ -351,6 +357,7 @@ impl AIConversation {
351357
parent_conversation_id,
352358
run_id,
353359
autoexecute_override,
360+
last_event_sequence,
354361
) = if let Some(data) = conversation_data {
355362
let server_conversation_token = data
356363
.server_conversation_token
@@ -381,6 +388,7 @@ impl AIConversation {
381388
} else {
382389
AIConversationAutoexecuteMode::default()
383390
};
391+
let last_event_sequence = data.last_event_sequence;
384392

385393
(
386394
server_conversation_token,
@@ -393,6 +401,7 @@ impl AIConversation {
393401
parent_conversation_id,
394402
run_id,
395403
autoexecute_override,
404+
last_event_sequence,
396405
)
397406
} else {
398407
(
@@ -406,6 +415,7 @@ impl AIConversation {
406415
None,
407416
None,
408417
AIConversationAutoexecuteMode::default(),
418+
None,
409419
)
410420
};
411421

@@ -448,6 +458,7 @@ impl AIConversation {
448458
agent_name,
449459
parent_conversation_id,
450460
is_remote_child: false,
461+
last_event_sequence,
451462
})
452463
}
453464

@@ -786,6 +797,16 @@ impl AIConversation {
786797
self.parent_conversation_id = Some(id);
787798
}
788799

800+
/// Returns the last observed v2 orchestration event sequence number, if any.
801+
pub fn last_event_sequence(&self) -> Option<i64> {
802+
self.last_event_sequence
803+
}
804+
805+
/// Updates the last observed v2 orchestration event sequence number.
806+
pub fn set_last_event_sequence(&mut self, sequence: i64) {
807+
self.last_event_sequence = Some(sequence);
808+
}
809+
789810
/// Returns true if this conversation was spawned by a parent orchestrator agent.
790811
pub fn is_child_agent_conversation(&self) -> bool {
791812
self.parent_conversation_id.is_some()
@@ -2811,6 +2832,7 @@ impl AIConversation {
28112832
parent_conversation_id: self.parent_conversation_id.map(|id| id.to_string()),
28122833
run_id: self.task_id.map(|id| id.to_string()),
28132834
autoexecute_override: Some(self.autoexecute_override.into()),
2835+
last_event_sequence: self.last_event_sequence,
28142836
},
28152837
};
28162838
ctx.spawn(

app/src/ai/agent_conversations_model_tests.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ fn create_test_task(
6060
agent_config_snapshot: None,
6161
artifacts: vec![],
6262
is_sandbox_running: false,
63+
last_event_sequence: None,
64+
children: vec![],
6365
}
6466
}
6567

@@ -152,6 +154,7 @@ fn test_display_status_uses_matching_conversation_for_in_progress_task() {
152154
parent_conversation_id: None,
153155
run_id: Some(task_id.clone()),
154156
autoexecute_override: None,
157+
last_event_sequence: None,
155158
},
156159
);
157160

@@ -203,6 +206,7 @@ fn test_display_status_updates_when_blocked_conversation_resumes() {
203206
parent_conversation_id: None,
204207
run_id: Some(task_id.clone()),
205208
autoexecute_override: None,
209+
last_event_sequence: None,
206210
},
207211
);
208212

@@ -278,6 +282,7 @@ fn test_display_status_terminal_task_state_overrides_matching_conversation() {
278282
parent_conversation_id: None,
279283
run_id: Some(task_id.clone()),
280284
autoexecute_override: None,
285+
last_event_sequence: None,
281286
},
282287
);
283288

@@ -329,6 +334,7 @@ fn test_status_filter_uses_display_status_for_task_backed_conversations() {
329334
parent_conversation_id: None,
330335
run_id: Some(task_id.clone()),
331336
autoexecute_override: None,
337+
last_event_sequence: None,
332338
},
333339
);
334340

@@ -763,6 +769,7 @@ fn test_get_tasks_and_conversations_prefers_task_when_task_id_matches_conversati
763769
parent_conversation_id: None,
764770
run_id: Some(task_id.clone()),
765771
autoexecute_override: None,
772+
last_event_sequence: None,
766773
},
767774
);
768775

@@ -819,6 +826,7 @@ fn test_get_tasks_and_conversations_prefers_task_when_server_token_matches() {
819826
parent_conversation_id: None,
820827
run_id: None,
821828
autoexecute_override: None,
829+
last_event_sequence: None,
822830
},
823831
);
824832

@@ -874,6 +882,7 @@ fn test_get_tasks_and_conversations_keeps_unrelated_tasks_and_conversations() {
874882
parent_conversation_id: None,
875883
run_id: None,
876884
autoexecute_override: None,
885+
last_event_sequence: None,
877886
},
878887
);
879888

app/src/ai/ambient_agents/spawn_tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ fn task_with(
3434
agent_config_snapshot: None,
3535
artifacts: vec![],
3636
is_sandbox_running: true,
37+
last_event_sequence: None,
38+
children: vec![],
3739
}
3840
}
3941

app/src/ai/ambient_agents/task.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,20 @@ pub struct AmbientAgentTask {
262262
pub agent_config_snapshot: Option<AgentConfigSnapshot>,
263263
#[serde(default, deserialize_with = "deserialize_artifacts")]
264264
pub artifacts: Vec<Artifact>,
265+
266+
/// The last event sequence number recorded for this run by the server.
267+
/// Used by orchestration event delivery to resume from the correct
268+
/// cursor on restart. Populated by `GET /agent/runs/{run_id}` when the
269+
/// server supports it; `None` on older servers.
270+
#[serde(default)]
271+
pub last_event_sequence: Option<i64>,
272+
273+
/// The server-recorded `run_id`s of direct children of this run. Used
274+
/// by orchestration event-delivery restore to discover children whose
275+
/// records may not exist locally (e.g. remote-worker children in the
276+
/// driver case). Empty on older servers.
277+
#[serde(default)]
278+
pub children: Vec<String>,
265279
}
266280

267281
/// Represents a single attachment input from the client (e.g., file upload)

app/src/ai/blocklist/history_model.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,23 @@ impl BlocklistAIHistoryModel {
452452
.unwrap_or_default()
453453
}
454454

455+
/// Updates the persisted `last_event_sequence` for a conversation and
456+
/// writes the updated conversation state to SQLite. Used by the
457+
/// orchestration event poller after draining an event batch to keep the
458+
/// cursor durable across restarts.
459+
pub fn update_event_sequence(
460+
&mut self,
461+
conversation_id: AIConversationId,
462+
sequence: i64,
463+
ctx: &mut ModelContext<Self>,
464+
) {
465+
let Some(conversation) = self.conversations_by_id.get_mut(&conversation_id) else {
466+
return;
467+
};
468+
conversation.set_last_event_sequence(sequence);
469+
conversation.write_updated_conversation_state(ctx);
470+
}
471+
455472
/// Sets a live conversation's server token, and updates the mapping in the history
456473
/// model.
457474
pub fn set_server_conversation_token_for_conversation(
@@ -1075,6 +1092,9 @@ impl BlocklistAIHistoryModel {
10751092
parent_conversation_id: None,
10761093
run_id: None,
10771094
autoexecute_override: Some(source_conversation.autoexecute_override().into()),
1095+
// The event cursor belongs to the source conversation's run; the
1096+
// forked conversation will establish its own cursor.
1097+
last_event_sequence: None,
10781098
};
10791099
let forked_conversation_id = AIConversationId::new();
10801100
if let Err(e) = sqlite_sender.send(ModelEvent::UpdateMultiAgentConversation {
@@ -1227,6 +1247,9 @@ impl BlocklistAIHistoryModel {
12271247
parent_conversation_id: None,
12281248
run_id: None,
12291249
autoexecute_override: Some(conversation.autoexecute_override().into()),
1250+
// The event cursor belongs to the source conversation's run; the
1251+
// forked conversation will establish its own cursor.
1252+
last_event_sequence: None,
12301253
};
12311254

12321255
let forked_conversation_id = AIConversationId::new();

app/src/ai/blocklist/history_model_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,7 @@ fn test_find_by_token_after_insert_forked_conversation_from_tasks() {
11321132
parent_conversation_id: None,
11331133
run_id: None,
11341134
autoexecute_override: None,
1135+
last_event_sequence: None,
11351136
};
11361137
let tasks = vec![warp_multi_agent_api::Task {
11371138
id: "root-task".to_string(),

0 commit comments

Comments
 (0)