Skip to content

Commit bc9ca7e

Browse files
cephalonautoz-agent
andcommitted
Remove orchestration_event_push feature flag and rename poller to streamer
The orchestration_event_push flag has been on in dogfood and staging long enough that the SSE event-push path is the only one we want to support. Polling is gone; the dual-mode `OrchestrationEventPoller` is renamed to `OrchestrationEventStreamer` and only opens SSE connections. - Removed the `OrchestrationEventPush` FeatureFlag variant and Cargo feature. - Renamed `orchestration_event_poller.rs` (and its tests) to `orchestration_event_streamer.rs`. Renamed the public type to `OrchestrationEventStreamer` and `handle_poll_result` (the shared event-injection sink) to `handle_event_batch`. - Removed polling-only state and methods: `poll_backoff_index`, `poll_in_flight`, `poll_and_inject`, `start_idle_poll_timer`, `POLL_BACKOFF_STEPS`, `EVENT_POLL_BATCH_LIMIT`. `event_cursor`, `pending_delivery`, and `conversation_statuses` are kept since they are also used by the SSE path. - Removed the dead `AIClient::poll_agent_events` trait method and impl in `server_api/ai.rs`. The `OrchestrationV2` flag continues to gate streamer instantiation and watched-run registration; behavior under v2 is unchanged. Co-Authored-By: Oz <oz-agent@warp.dev>
1 parent bc3fffa commit bc9ca7e

8 files changed

Lines changed: 66 additions & 240 deletions

File tree

app/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,6 @@ active_conversation_requires_interaction = []
902902
incremental_auto_reload = []
903903
orchestration = []
904904
orchestration_v2 = ["orchestration"]
905-
orchestration_event_push = ["orchestration_v2"]
906905
pending_user_query_indicator = []
907906
queue_slash_command = []
908907
inline_menu_headers = []

app/src/ai/blocklist/action_model/execute/start_agent.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::ai::agent::{
66
AIAgentAction, AIAgentActionResultType, AIAgentActionType, LifecycleEventType,
77
StartAgentExecutionMode, StartAgentResult,
88
};
9-
use crate::ai::blocklist::orchestration_event_poller::OrchestrationEventPoller;
9+
use crate::ai::blocklist::orchestration_event_streamer::OrchestrationEventStreamer;
1010
use crate::ai::blocklist::orchestration_events::OrchestrationEventService;
1111
use crate::ai::blocklist::{BlocklistAIHistoryEvent, BlocklistAIHistoryModel};
1212
use warp_cli::agent::Harness;
@@ -110,8 +110,8 @@ impl StartAgentExecutor {
110110
agent_id: id.clone(),
111111
});
112112
if FeatureFlag::OrchestrationV2.is_enabled() {
113-
OrchestrationEventPoller::handle(ctx).update(ctx, |poller, ctx| {
114-
poller.register_watched_run_id(
113+
OrchestrationEventStreamer::handle(ctx).update(ctx, |streamer, ctx| {
114+
streamer.register_watched_run_id(
115115
pending.parent_conversation_id,
116116
id,
117117
ctx,

app/src/ai/blocklist/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub mod block;
55
pub mod code_block;
66
mod context_model;
77
mod controller;
8-
pub(crate) mod orchestration_event_poller;
8+
pub(crate) mod orchestration_event_streamer;
99
pub(crate) mod orchestration_events;
1010
mod passive_suggestions;
1111
pub(crate) mod task_status_sync_model;

app/src/ai/blocklist/orchestration_event_poller.rs renamed to app/src/ai/blocklist/orchestration_event_streamer.rs

Lines changed: 43 additions & 180 deletions
Large diffs are not rendered by default.

app/src/ai/blocklist/orchestration_event_poller_tests.rs renamed to app/src/ai/blocklist/orchestration_event_streamer_tests.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ fn finish_restore_fetch_uses_server_cursor_when_sqlite_is_absent() {
218218
let server_api = ServerApiProvider::new_for_test().get();
219219

220220
let poller = app.add_singleton_model(|ctx| {
221-
OrchestrationEventPoller::new_with_clients_for_test(ai_client, server_api, ctx)
221+
OrchestrationEventStreamer::new_with_clients_for_test(ai_client, server_api, ctx)
222222
});
223223

224224
// Seed event_cursor as on_restored_conversations would before spawning
@@ -283,43 +283,37 @@ fn restored_inprogress_parent_defers_delivery_until_success() {
283283
// a permissive expectation prevents spurious panics either way.
284284
mock.expect_get_ambient_agent_task()
285285
.returning(|_| Ok(make_ambient_task_with_event_seq(None)));
286-
mock.expect_poll_agent_events()
287-
.returning(|_, _, _| Ok(vec![]));
288286
mock.expect_update_event_sequence_on_server()
289287
.returning(|_, _| Ok(()));
290288
let ai_client: Arc<dyn AIClient> = Arc::new(mock);
291289
let server_api = ServerApiProvider::new_for_test().get();
292290

293-
let poller = app.add_singleton_model(|ctx| {
294-
OrchestrationEventPoller::new_with_clients_for_test(ai_client, server_api, ctx)
291+
let streamer = app.add_singleton_model(|ctx| {
292+
OrchestrationEventStreamer::new_with_clients_for_test(ai_client, server_api, ctx)
295293
});
296294

297295
// Synchronous part of `on_restored_conversations`: cursor seeded,
298296
// own run_id watched. No event delivery yet because parent is
299297
// InProgress.
300-
poller.update(&mut app, |me, ctx| {
298+
streamer.update(&mut app, |me, ctx| {
301299
me.on_restored_conversations(vec![conversation_id], ctx);
302300
});
303-
poller.read(&app, |me, _| {
301+
streamer.read(&app, |me, _| {
304302
assert_eq!(me.event_cursor.get(&conversation_id).copied(), Some(0));
305303
assert!(
306304
me.watched_run_ids
307305
.get(&conversation_id)
308306
.is_some_and(|w| !w.is_empty()),
309307
"own run_id should have been registered as watched"
310308
);
311-
assert!(
312-
!me.poll_in_flight.contains(&conversation_id),
313-
"InProgress parent must not start polling"
314-
);
315309
assert!(
316310
me.sse_connections.is_empty(),
317311
"InProgress parent must not open SSE"
318312
);
319313
});
320314

321-
// Transitioning the conversation to Success should trigger event
322-
// delivery (poll_and_inject in the non-SSE default path).
315+
// Transitioning the conversation to Success should open an SSE
316+
// connection for event delivery.
323317
history_model.update(&mut app, |model, ctx| {
324318
model.update_conversation_status(
325319
terminal_view_id,
@@ -328,17 +322,17 @@ fn restored_inprogress_parent_defers_delivery_until_success() {
328322
ctx,
329323
);
330324
});
331-
poller.read(&app, |me, _| {
325+
streamer.read(&app, |me, _| {
332326
assert!(
333-
me.poll_in_flight.contains(&conversation_id),
334-
"Success transition with watched run_ids should start delivery"
327+
me.sse_connections.contains_key(&conversation_id),
328+
"Success transition with watched run_ids should open an SSE connection"
335329
);
336330
});
337331
});
338332
}
339333

340334
#[test]
341-
fn handle_poll_result_persists_max_seq_to_history_model() {
335+
fn handle_event_batch_persists_max_seq_to_history_model() {
342336
use crate::ai::agent::conversation::{AIConversation, AIConversationId};
343337
use crate::persistence::ModelEvent;
344338
use crate::server::server_api::ai::MockAIClient;
@@ -378,7 +372,7 @@ fn handle_poll_result_persists_max_seq_to_history_model() {
378372
let server_api = ServerApiProvider::new_for_test().get();
379373

380374
let poller = app.add_singleton_model(|ctx| {
381-
OrchestrationEventPoller::new_with_clients_for_test(ai_client, server_api, ctx)
375+
OrchestrationEventStreamer::new_with_clients_for_test(ai_client, server_api, ctx)
382376
});
383377

384378
// Build a poll batch with max sequence = 42. Use an unrecognized
@@ -405,7 +399,7 @@ fn handle_poll_result_persists_max_seq_to_history_model() {
405399
];
406400

407401
poller.update(&mut app, |me, ctx| {
408-
me.handle_poll_result(
402+
me.handle_event_batch(
409403
conversation_id,
410404
/* self_run_id */ "some-other-run",
411405
/* previous_cursor */ 0,
@@ -463,7 +457,7 @@ fn finish_restore_fetch_no_ops_when_conversation_deleted_mid_flight() {
463457
let server_api = ServerApiProvider::new_for_test().get();
464458

465459
let poller = app.add_singleton_model(|ctx| {
466-
OrchestrationEventPoller::new_with_clients_for_test(ai_client, server_api, ctx)
460+
OrchestrationEventStreamer::new_with_clients_for_test(ai_client, server_api, ctx)
467461
});
468462

469463
// Seed cursor as on_restored_conversations would.
@@ -542,7 +536,7 @@ fn finish_restore_fetch_reconnects_sse_when_children_added_to_open_connection()
542536
let server_api = ServerApiProvider::new_for_test().get();
543537

544538
let poller = app.add_singleton_model(|ctx| {
545-
OrchestrationEventPoller::new_with_clients_for_test(ai_client, server_api, ctx)
539+
OrchestrationEventStreamer::new_with_clients_for_test(ai_client, server_api, ctx)
546540
});
547541

548542
// Seed the state on_restored_conversations would have set up, then

app/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1562,7 +1562,7 @@ fn initialize_app(
15621562
ctx.add_singleton_model(ai::blocklist::task_status_sync_model::TaskStatusSyncModel::new);
15631563
if warp_core::features::FeatureFlag::OrchestrationV2.is_enabled() {
15641564
ctx.add_singleton_model(
1565-
ai::blocklist::orchestration_event_poller::OrchestrationEventPoller::new,
1565+
ai::blocklist::orchestration_event_streamer::OrchestrationEventStreamer::new,
15661566
);
15671567
}
15681568

@@ -2718,8 +2718,6 @@ pub fn enabled_features() -> HashSet<FeatureFlag> {
27182718
FeatureFlag::Orchestration,
27192719
#[cfg(feature = "orchestration_v2")]
27202720
FeatureFlag::OrchestrationV2,
2721-
#[cfg(feature = "orchestration_event_push")]
2722-
FeatureFlag::OrchestrationEventPush,
27232721
#[cfg(feature = "pending_user_query_indicator")]
27242722
FeatureFlag::PendingUserQueryIndicator,
27252723
#[cfg(feature = "queue_slash_command")]

app/src/server/server_api/ai.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -916,13 +916,6 @@ pub trait AIClient: 'static + Send + Sync {
916916
request: ListAgentMessagesRequest,
917917
) -> anyhow::Result<Vec<AgentMessageHeader>, anyhow::Error>;
918918

919-
async fn poll_agent_events(
920-
&self,
921-
run_ids: &[String],
922-
since_sequence: i64,
923-
limit: i32,
924-
) -> anyhow::Result<Vec<AgentRunEvent>, anyhow::Error>;
925-
926919
/// Persists the latest observed event sequence number for a run on the
927920
/// server. Used to keep the server-side cursor in sync with the client so
928921
/// that driver/cloud restores can resume without replaying events the
@@ -1879,22 +1872,6 @@ impl AIClient for ServerApi {
18791872
Ok(response)
18801873
}
18811874

1882-
async fn poll_agent_events(
1883-
&self,
1884-
run_ids: &[String],
1885-
since_sequence: i64,
1886-
limit: i32,
1887-
) -> anyhow::Result<Vec<AgentRunEvent>, anyhow::Error> {
1888-
let run_ids_param: String = run_ids
1889-
.iter()
1890-
.map(|id| format!("run_ids={}", urlencoding::encode(id)))
1891-
.collect::<Vec<_>>()
1892-
.join("&");
1893-
let url = format!("agent/events?{run_ids_param}&since={since_sequence}&limit={limit}");
1894-
let events: Vec<AgentRunEvent> = self.get_public_api(&url).await?;
1895-
Ok(events)
1896-
}
1897-
18981875
async fn update_event_sequence_on_server(
18991876
&self,
19001877
run_id: &str,

crates/warp_features/src/lib.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -710,15 +710,11 @@ pub enum FeatureFlag {
710710
Orchestration,
711711

712712
/// Enables server-side durable messaging for orchestration (v2).
713-
/// When enabled, messages and events are stored in Postgres and discovered
714-
/// via server polling instead of client-local conversation history.
713+
/// When enabled, messages and events are stored in Postgres and the client
714+
/// opens a persistent SSE connection to the server to receive events in
715+
/// real time.
715716
OrchestrationV2,
716717

717-
/// Enables SSE-based event push for orchestration instead of polling.
718-
/// When enabled the client opens a persistent SSE connection to the server
719-
/// and receives events in real time instead of short-polling.
720-
OrchestrationEventPush,
721-
722718
/// Shows a pending user query indicator during summarization when a follow-up
723719
/// prompt is queued via `/fork-and-compact` or `/compact-and`.
724720
PendingUserQueryIndicator,
@@ -906,7 +902,6 @@ pub const DOGFOOD_FLAGS: &[FeatureFlag] = &[
906902
FeatureFlag::RememberFastForwardState,
907903
FeatureFlag::HOANotifications,
908904
FeatureFlag::OrchestrationV2,
909-
FeatureFlag::OrchestrationEventPush,
910905
FeatureFlag::GeminiNotifications,
911906
FeatureFlag::LocalDockerSandbox,
912907
FeatureFlag::VerticalTabsSummaryMode,

0 commit comments

Comments
 (0)